Coverage for src / idx_api / routers / vision.py: 59%
124 statements
« prev ^ index » next coverage.py v7.13.1, created at 2025-12-28 11:09 -0700
« prev ^ index » next coverage.py v7.13.1, created at 2025-12-28 11:09 -0700
1"""Vision extraction admin endpoints."""
3import asyncio
4from datetime import datetime
5from typing import Optional
7from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
8from pydantic import BaseModel
9from sqlalchemy import desc, select, text
10from sqlalchemy.orm import Session
12from idx_api.auth import AdminUser
13from idx_api.config import settings
14from idx_api.database import get_db
15from idx_api.embeddings.images import get_image_index_stats, get_unindexed_property_images
16from idx_api.models.vision_job import VisionExtractionJob
18router = APIRouter()
21# --- Response Models ---
24class ImageIndexStats(BaseModel):
25 """Current image index statistics."""
27 total_properties: int
28 properties_with_images: int
29 total_images: int
30 description_embeddings: int
31 visual_embeddings: int
32 coverage_percent: float
35class TopFeature(BaseModel):
36 """Most common feature tag."""
38 feature_type: str
39 feature_value: str
40 count: int
43class RoomTypeStats(BaseModel):
44 """Room type distribution."""
46 room_type: str
47 count: int
48 percent: float
51class VisionStatusResponse(BaseModel):
52 """Complete vision system status."""
54 enabled: bool
55 model: str
56 ollama_url: str
58 # Current stats
59 stats: ImageIndexStats
60 unindexed_count: int
62 # Top features
63 top_features: list[TopFeature]
64 room_types: list[RoomTypeStats]
66 # Active job (if any)
67 active_job: Optional[dict] = None
70class JobSummary(BaseModel):
71 """Summary of an extraction job."""
73 id: int
74 status: str
75 started_at: datetime
76 ended_at: Optional[datetime]
77 duration_seconds: Optional[float]
78 total_properties: int
79 processed_properties: int
80 total_images: int
81 processed_images: int
82 failed_images: int
83 tags_created: int
84 progress_percent: float
85 triggered_by: Optional[str]
87 class Config:
88 from_attributes = True
91class JobHistoryResponse(BaseModel):
92 """Paginated job history."""
94 jobs: list[JobSummary]
95 total: int
98class StartJobRequest(BaseModel):
99 """Request to start extraction job."""
101 limit: Optional[int] = None
102 brokerage_id: Optional[int] = None
103 skip_visual: bool = True
106class StartJobResponse(BaseModel):
107 """Response after starting job."""
109 job_id: int
110 message: str
113# --- Endpoints ---
116@router.get("/status", response_model=VisionStatusResponse)
117async def get_vision_status(user: AdminUser, db: Session = Depends(get_db)):
118 """Get current vision extraction system status."""
120 # Get image index stats
121 stats_raw = get_image_index_stats(db)
122 coverage = (
123 (stats_raw["properties_with_images"] / stats_raw["total_properties"] * 100)
124 if stats_raw["total_properties"] > 0
125 else 0
126 )
128 stats = ImageIndexStats(
129 total_properties=stats_raw["total_properties"],
130 properties_with_images=stats_raw["properties_with_images"],
131 total_images=stats_raw["total_images"],
132 description_embeddings=stats_raw["description_embeddings"],
133 visual_embeddings=stats_raw["visual_embeddings"],
134 coverage_percent=round(coverage, 1),
135 )
137 # Get unindexed count
138 unindexed = get_unindexed_property_images(db, limit=1000)
139 unindexed_count = len(unindexed)
141 # Get top features
142 result = db.execute(
143 text("""
144 SELECT feature_type, feature_value, COUNT(*) as cnt
145 FROM image_feature_tags
146 GROUP BY feature_type, feature_value
147 ORDER BY cnt DESC
148 LIMIT 15
149 """)
150 )
151 top_features = [
152 TopFeature(feature_type=row.feature_type, feature_value=row.feature_value, count=row.cnt)
153 for row in result.fetchall()
154 ]
156 # Get room type distribution
157 result = db.execute(
158 text("""
159 SELECT room_type, COUNT(*) as cnt
160 FROM property_images
161 WHERE room_type IS NOT NULL
162 GROUP BY room_type
163 ORDER BY cnt DESC
164 """)
165 )
166 room_rows = result.fetchall()
167 total_rooms = sum(row.cnt for row in room_rows)
168 room_types = [
169 RoomTypeStats(
170 room_type=row.room_type,
171 count=row.cnt,
172 percent=round(row.cnt / total_rooms * 100, 1) if total_rooms > 0 else 0,
173 )
174 for row in room_rows
175 ]
177 # Check for active job
178 active_job = None
179 result = db.execute(
180 select(VisionExtractionJob)
181 .where(VisionExtractionJob.status.in_(["pending", "running"]))
182 .order_by(desc(VisionExtractionJob.started_at))
183 .limit(1)
184 )
185 job = result.scalar_one_or_none()
186 if job:
187 active_job = {
188 "id": job.id,
189 "status": job.status,
190 "progress_percent": job.progress_percent,
191 "processed_images": job.processed_images,
192 "total_images": job.total_images,
193 "started_at": job.started_at.isoformat() if job.started_at else None,
194 }
196 return VisionStatusResponse(
197 enabled=settings.vision_enabled,
198 model=settings.vision_model,
199 ollama_url=settings.ollama_base_url,
200 stats=stats,
201 unindexed_count=unindexed_count,
202 top_features=top_features,
203 room_types=room_types,
204 active_job=active_job,
205 )
208@router.get("/jobs", response_model=JobHistoryResponse)
209async def get_job_history(
210 user: AdminUser,
211 db: Session = Depends(get_db),
212 limit: int = 20,
213 offset: int = 0,
214):
215 """Get extraction job history."""
217 # Get total count
218 total_result = db.execute(select(VisionExtractionJob.id))
219 total = len(total_result.fetchall())
221 # Get jobs
222 result = db.execute(
223 select(VisionExtractionJob)
224 .order_by(desc(VisionExtractionJob.started_at))
225 .limit(limit)
226 .offset(offset)
227 )
228 jobs = result.scalars().all()
230 return JobHistoryResponse(
231 jobs=[
232 JobSummary(
233 id=job.id,
234 status=job.status,
235 started_at=job.started_at,
236 ended_at=job.ended_at,
237 duration_seconds=job.duration_seconds,
238 total_properties=job.total_properties,
239 processed_properties=job.processed_properties,
240 total_images=job.total_images,
241 processed_images=job.processed_images,
242 failed_images=job.failed_images,
243 tags_created=job.tags_created,
244 progress_percent=job.progress_percent,
245 triggered_by=job.triggered_by,
246 )
247 for job in jobs
248 ],
249 total=total,
250 )
253@router.get("/jobs/{job_id}", response_model=JobSummary)
254async def get_job_detail(job_id: int, user: AdminUser, db: Session = Depends(get_db)):
255 """Get details for a specific job."""
257 result = db.execute(select(VisionExtractionJob).where(VisionExtractionJob.id == job_id))
258 job = result.scalar_one_or_none()
260 if not job:
261 raise HTTPException(status_code=404, detail="Job not found")
263 return JobSummary(
264 id=job.id,
265 status=job.status,
266 started_at=job.started_at,
267 ended_at=job.ended_at,
268 duration_seconds=job.duration_seconds,
269 total_properties=job.total_properties,
270 processed_properties=job.processed_properties,
271 total_images=job.total_images,
272 processed_images=job.processed_images,
273 failed_images=job.failed_images,
274 tags_created=job.tags_created,
275 progress_percent=job.progress_percent,
276 triggered_by=job.triggered_by,
277 )
280# Note: Actual job execution would need to be handled by a separate worker process
281# This endpoint just creates the job record - a background worker polls for pending jobs
282@router.post("/jobs", response_model=StartJobResponse)
283async def start_extraction_job(
284 request: StartJobRequest,
285 user: AdminUser,
286 db: Session = Depends(get_db),
287):
288 """Create a new extraction job.
290 Note: This creates the job record. A separate worker process
291 should poll for pending jobs and execute them.
292 """
294 # Check for existing active job
295 result = db.execute(
296 select(VisionExtractionJob).where(VisionExtractionJob.status.in_(["pending", "running"]))
297 )
298 active = result.scalar_one_or_none()
299 if active:
300 raise HTTPException(
301 status_code=409, detail=f"Job {active.id} is already {active.status}"
302 )
304 # Get count of unindexed properties
305 unindexed = get_unindexed_property_images(db, limit=request.limit or 10000)
306 total_properties = len(unindexed)
307 total_images = sum(len(p["photos"]) for p in unindexed)
309 if total_properties == 0:
310 raise HTTPException(status_code=400, detail="No unindexed properties found")
312 # Create job record
313 job = VisionExtractionJob(
314 status="pending",
315 started_at=datetime.utcnow(),
316 brokerage_id=request.brokerage_id,
317 limit_properties=request.limit,
318 total_properties=total_properties,
319 total_images=total_images,
320 vision_model=settings.vision_model,
321 skip_visual_embeddings=request.skip_visual,
322 triggered_by=user.email,
323 )
324 db.add(job)
325 db.commit()
326 db.refresh(job)
328 return StartJobResponse(
329 job_id=job.id,
330 message=f"Job created: {total_properties} properties, {total_images} images queued",
331 )
334@router.post("/jobs/{job_id}/cancel")
335async def cancel_job(job_id: int, user: AdminUser, db: Session = Depends(get_db)):
336 """Cancel a pending or running job."""
338 result = db.execute(select(VisionExtractionJob).where(VisionExtractionJob.id == job_id))
339 job = result.scalar_one_or_none()
341 if not job:
342 raise HTTPException(status_code=404, detail="Job not found")
344 if job.status not in ["pending", "running"]:
345 raise HTTPException(status_code=400, detail=f"Cannot cancel job with status: {job.status}")
347 job.status = "cancelled"
348 job.ended_at = datetime.utcnow()
349 db.commit()
351 return {"message": f"Job {job_id} cancelled"}