Coverage for src / idx_api / vision.py: 0%
180 statements
« prev ^ index » next coverage.py v7.13.1, created at 2025-12-28 11:09 -0700
« prev ^ index » next coverage.py v7.13.1, created at 2025-12-28 11:09 -0700
1"""Vision service for property photo analysis.
3Uses Qwen3-VL via Ollama for generating buyer-focused property descriptions,
4and optionally SigLIP for direct visual embeddings.
6Configuration:
7- System-wide settings in config.py (vision_prompt_base, vision_default_vocabulary)
8- Per-brokerage overrides in BrokerageVisionSettings table
9"""
11import asyncio
12import base64
13import json
14import logging
15import re
16from io import BytesIO
17from pathlib import Path
18from typing import TYPE_CHECKING, Optional
20import httpx
21from PIL import Image
22from pydantic import BaseModel, Field
24from idx_api.config import settings
26if TYPE_CHECKING:
27 from idx_api.models.brokerage_vision_settings import BrokerageVisionSettings
30class ImageTags(BaseModel):
31 """Structured tags extracted from a property photo."""
32 room_type: str = Field(default="other")
33 features: list[str] = Field(default_factory=list)
34 materials: list[str] = Field(default_factory=list)
35 style: str = Field(default="")
36 condition: str = Field(default="")
37 highlights: list[str] = Field(default_factory=list)
38 quality_score: int = Field(default=3, ge=1, le=5)
40logger = logging.getLogger(__name__)
42# Real estate focused prompt for property photo analysis
43REAL_ESTATE_PHOTO_PROMPT = """You are analyzing photos for a real estate listing website.
44Potential home buyers will search for properties using natural language queries.
46Describe this property photo focusing on features buyers search for:
48**Room & Space**: What room/area is this? (kitchen, primary bedroom, backyard, etc.)
50**Key Features Buyers Want**:
51- Kitchen: countertop material, cabinet style, appliances, island, pantry
52- Bathrooms: vanity style, shower/tub, tile work, double sinks
53- Living areas: fireplace, built-ins, ceiling height, flooring type
54- Bedrooms: size, closet, natural light, en-suite bathroom
55- Exterior: pool, deck/patio, landscaping, views, garage, fencing
56- Overall: condition (updated/original), style (modern/traditional/farmhouse)
58**What would make a buyer excited about this space?**
60Be specific and use terms buyers search for: "granite countertops", "stainless steel
61appliances", "open floor plan", "mountain views", "covered patio", "walk-in closet".
63Keep response under 150 words, focusing on searchable features."""
65# Shorter prompt for batch processing
66REAL_ESTATE_PHOTO_PROMPT_SHORT = """Describe this real estate photo for property search.
67Include: room type, materials (countertops, flooring, cabinets), style (modern/traditional),
68condition (updated/original), special features (pool, fireplace, views).
69Use specific terms buyers search for. Keep under 100 words."""
71# Structured tagging prompt for feature extraction
72STRUCTURED_TAGGING_PROMPT = """Analyze this real estate photo and extract structured tags.
74Return a JSON object with these fields:
75{
76 "room_type": "kitchen|bathroom|bedroom|living_room|dining|exterior|backyard|garage|basement|office|other",
77 "features": ["list", "of", "visible", "features"],
78 "materials": ["granite", "hardwood", "tile", "etc"],
79 "style": "modern|traditional|farmhouse|craftsman|contemporary|mid_century|rustic",
80 "condition": "new|updated|original|dated",
81 "highlights": ["buyer-exciting", "features"],
82 "quality_score": 1-5
83}
85For features, use searchable terms buyers look for:
86- Kitchen: island, pantry, breakfast bar, double oven, gas range
87- Bathroom: soaking tub, walk-in shower, double vanity, jetted tub
88- Living: fireplace, built-ins, vaulted ceiling, open floor plan
89- Exterior: pool, covered patio, deck, mountain views, workshop, RV parking
90- Garage: shop, workbench, storage, oversized, 3-car
92Only include features clearly visible in the photo. Return valid JSON only."""
95def build_vision_prompt(
96 brokerage_settings: Optional["BrokerageVisionSettings"] = None,
97) -> str:
98 """Build the vision analysis prompt from config and brokerage overrides.
100 Args:
101 brokerage_settings: Optional per-brokerage customization
103 Returns:
104 Complete prompt string for vision model
105 """
106 # Start with system defaults from config
107 vocabulary = settings.vision_default_vocabulary
109 # Build additional instructions
110 additional = ""
112 if brokerage_settings:
113 # Merge custom vocabulary
114 if brokerage_settings.custom_vocabulary:
115 vocabulary = f"{vocabulary}\n{brokerage_settings.custom_vocabulary}"
117 # Get brokerage-specific instructions
118 additional = brokerage_settings.get_prompt_additions()
120 # Build final prompt from template
121 prompt = settings.vision_prompt_base.format(
122 additional_instructions=additional,
123 feature_vocabulary=vocabulary,
124 )
126 return prompt
129def get_vision_config(
130 brokerage_settings: Optional["BrokerageVisionSettings"] = None,
131) -> dict:
132 """Get vision configuration merging system defaults with brokerage overrides.
134 Args:
135 brokerage_settings: Optional per-brokerage customization
137 Returns:
138 Dict with vision configuration values
139 """
140 config = {
141 "enabled": settings.vision_enabled,
142 "model": settings.vision_model,
143 "max_photos": settings.vision_max_photos_per_property,
144 "image_max_size": settings.vision_image_max_size,
145 "timeout": settings.vision_request_timeout,
146 "max_retries": settings.vision_max_retries,
147 "concurrency": settings.vision_concurrency,
148 }
150 if brokerage_settings:
151 # Override with brokerage settings where specified
152 if not brokerage_settings.vision_enabled:
153 config["enabled"] = False
154 if brokerage_settings.max_photos_per_property:
155 config["max_photos"] = brokerage_settings.max_photos_per_property
157 return config
160async def download_image(url: str, timeout: float = 30.0) -> bytes:
161 """Download an image from URL and return as bytes.
163 Args:
164 url: Image URL to download
165 timeout: Request timeout in seconds
167 Returns:
168 Image data as bytes
170 Raises:
171 httpx.HTTPError: If download fails
172 """
173 async with httpx.AsyncClient(timeout=timeout) as client:
174 response = await client.get(url)
175 response.raise_for_status()
176 return response.content
179def resize_image_if_needed(image_data: bytes, max_size: int = 1024) -> bytes:
180 """Resize image if it exceeds max dimensions to reduce API payload.
182 Args:
183 image_data: Original image bytes
184 max_size: Maximum dimension (width or height)
186 Returns:
187 Resized image bytes (JPEG format)
188 """
189 img = Image.open(BytesIO(image_data))
191 # Convert to RGB if necessary (for PNG with alpha, etc.)
192 if img.mode in ('RGBA', 'LA', 'P'):
193 img = img.convert('RGB')
195 # Only resize if larger than max_size
196 if max(img.size) > max_size:
197 img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
199 # Save as JPEG for smaller payload
200 buffer = BytesIO()
201 img.save(buffer, format='JPEG', quality=85)
202 return buffer.getvalue()
205async def describe_image(
206 image_url: str,
207 prompt: str = REAL_ESTATE_PHOTO_PROMPT_SHORT,
208 max_retries: int = 2,
209) -> Optional[str]:
210 """Generate a description of a property photo using Qwen3-VL.
212 Args:
213 image_url: URL of the property photo
214 prompt: System prompt for the vision model
215 max_retries: Number of retry attempts on failure
217 Returns:
218 Text description of the image, or None if failed
219 """
220 if not settings.vision_enabled:
221 logger.warning("Vision model disabled in settings")
222 return None
224 for attempt in range(max_retries + 1):
225 try:
226 # Download and prepare image
227 image_data = await download_image(image_url)
228 image_data = resize_image_if_needed(image_data)
229 image_b64 = base64.b64encode(image_data).decode('utf-8')
231 # Call Ollama vision API
232 async with httpx.AsyncClient(timeout=120.0) as client:
233 response = await client.post(
234 f"{settings.ollama_base_url}/api/generate",
235 json={
236 "model": settings.vision_model,
237 "prompt": prompt,
238 "images": [image_b64],
239 "stream": False,
240 },
241 )
242 response.raise_for_status()
243 result = response.json()
244 return result.get("response", "").strip()
246 except httpx.HTTPError as e:
247 logger.warning(f"Attempt {attempt + 1} failed for {image_url}: {e}")
248 if attempt < max_retries:
249 await asyncio.sleep(1 * (attempt + 1)) # Exponential backoff
250 else:
251 logger.error(f"Failed to describe image after {max_retries + 1} attempts: {image_url}")
252 return None
254 except Exception as e:
255 logger.error(f"Unexpected error describing image {image_url}: {e}")
256 return None
259async def batch_describe_images(
260 image_urls: list[str],
261 prompt: str = REAL_ESTATE_PHOTO_PROMPT_SHORT,
262 concurrency: int = 3,
263) -> dict[str, Optional[str]]:
264 """Generate descriptions for multiple images concurrently.
266 Args:
267 image_urls: List of image URLs to describe
268 prompt: System prompt for the vision model
269 concurrency: Maximum concurrent requests
271 Returns:
272 Dict mapping URL to description (or None if failed)
273 """
274 semaphore = asyncio.Semaphore(concurrency)
276 async def describe_with_semaphore(url: str) -> tuple[str, Optional[str]]:
277 async with semaphore:
278 description = await describe_image(url, prompt)
279 return url, description
281 tasks = [describe_with_semaphore(url) for url in image_urls]
282 results = await asyncio.gather(*tasks)
284 return dict(results)
287async def analyze_image_structured(
288 image_url: str,
289 max_retries: Optional[int] = None,
290 brokerage_settings: Optional["BrokerageVisionSettings"] = None,
291) -> Optional[ImageTags]:
292 """Analyze a property photo and extract structured tags.
294 Uses Qwen3-VL to extract room type, features, materials, style, and
295 quality score in a structured JSON format.
297 Args:
298 image_url: URL of the property photo
299 max_retries: Number of retry attempts (defaults to config setting)
300 brokerage_settings: Optional per-brokerage customization
302 Returns:
303 ImageTags object with structured data, or None if failed
304 """
305 # Get merged configuration
306 config = get_vision_config(brokerage_settings)
308 if not config["enabled"]:
309 logger.warning("Vision model disabled in settings")
310 return None
312 # Build prompt from config and brokerage overrides
313 prompt = build_vision_prompt(brokerage_settings)
315 # Use provided retries or config default
316 retries = max_retries if max_retries is not None else config["max_retries"]
318 for attempt in range(retries + 1):
319 try:
320 # Download and prepare image
321 image_data = await download_image(image_url)
322 image_data = resize_image_if_needed(image_data, max_size=config["image_max_size"])
323 image_b64 = base64.b64encode(image_data).decode('utf-8')
325 # Call Ollama vision API with structured prompt
326 # Note: Don't use format:"json" - it breaks qwen3-vl responses
327 async with httpx.AsyncClient(timeout=config["timeout"]) as client:
328 response = await client.post(
329 f"{settings.ollama_base_url}/api/generate",
330 json={
331 "model": config["model"],
332 "prompt": prompt,
333 "images": [image_b64],
334 "stream": False,
335 },
336 )
337 response.raise_for_status()
338 result = response.json()
339 raw_response = result.get("response", "").strip()
341 # Try to parse JSON from response
342 try:
343 # Handle markdown code blocks if present
344 json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', raw_response, re.DOTALL)
345 if json_match:
346 json_str = json_match.group(1)
347 else:
348 json_str = raw_response
350 parsed = json.loads(json_str)
351 return ImageTags(**parsed)
353 except (json.JSONDecodeError, ValueError) as e:
354 logger.warning(f"Failed to parse JSON from vision response: {e}")
355 # Return a minimal ImageTags with extracted room type
356 room = extract_room_type(raw_response)
357 return ImageTags(room_type=room or "other")
359 except httpx.HTTPError as e:
360 logger.warning(f"Attempt {attempt + 1} failed for {image_url}: {e}")
361 if attempt < retries:
362 await asyncio.sleep(1 * (attempt + 1))
363 else:
364 logger.error(f"Failed to analyze image after {retries + 1} attempts: {image_url}")
365 return None
367 except Exception as e:
368 logger.error(f"Unexpected error analyzing image {image_url}: {e}")
369 return None
371 return None
374async def batch_analyze_images_structured(
375 image_urls: list[str],
376 concurrency: int = 3,
377) -> dict[str, Optional[ImageTags]]:
378 """Analyze multiple images and extract structured tags concurrently.
380 Args:
381 image_urls: List of image URLs to analyze
382 concurrency: Maximum concurrent requests
384 Returns:
385 Dict mapping URL to ImageTags (or None if failed)
386 """
387 semaphore = asyncio.Semaphore(concurrency)
389 async def analyze_with_semaphore(url: str) -> tuple[str, Optional[ImageTags]]:
390 async with semaphore:
391 tags = await analyze_image_structured(url)
392 return url, tags
394 tasks = [analyze_with_semaphore(url) for url in image_urls]
395 results = await asyncio.gather(*tasks)
397 return dict(results)
400def extract_room_type(description: str) -> Optional[str]:
401 """Extract the room type from a description.
403 Args:
404 description: Image description text
406 Returns:
407 Detected room type or None
408 """
409 room_keywords = {
410 "kitchen": ["kitchen", "countertop", "cabinet", "appliance", "pantry", "island"],
411 "living_room": ["living room", "living area", "family room", "fireplace", "great room"],
412 "bedroom": ["bedroom", "master", "primary", "guest room", "bed"],
413 "bathroom": ["bathroom", "bath", "vanity", "shower", "tub", "toilet"],
414 "dining": ["dining", "breakfast nook", "eat-in"],
415 "exterior": ["exterior", "front", "facade", "yard", "lawn", "garden", "curb"],
416 "backyard": ["backyard", "back yard", "patio", "deck", "pool", "outdoor"],
417 "garage": ["garage", "carport", "parking"],
418 "basement": ["basement", "lower level", "rec room"],
419 "office": ["office", "study", "den", "workspace"],
420 }
422 description_lower = description.lower()
424 for room_type, keywords in room_keywords.items():
425 for keyword in keywords:
426 if keyword in description_lower:
427 return room_type
429 return None
432# SigLIP visual embedding functions (Approach B)
433# These will be enabled when SigLIP service is set up
435async def get_visual_embedding(image_url: str) -> Optional[list[float]]:
436 """Get SigLIP visual embedding for an image.
438 Args:
439 image_url: URL of the image
441 Returns:
442 1024-dimensional embedding vector, or None if SigLIP disabled/failed
443 """
444 if not settings.siglip_enabled:
445 return None
447 try:
448 # Download and prepare image
449 image_data = await download_image(image_url)
450 image_data = resize_image_if_needed(image_data, max_size=384) # SigLIP native size
451 image_b64 = base64.b64encode(image_data).decode('utf-8')
453 async with httpx.AsyncClient(timeout=30.0) as client:
454 response = await client.post(
455 f"{settings.siglip_base_url}/embed/image",
456 json={"image": image_b64},
457 )
458 response.raise_for_status()
459 result = response.json()
460 return result.get("embedding")
462 except Exception as e:
463 logger.error(f"Failed to get visual embedding for {image_url}: {e}")
464 return None
467async def batch_get_visual_embeddings(
468 image_urls: list[str],
469 concurrency: int = 5,
470) -> dict[str, Optional[list[float]]]:
471 """Get SigLIP visual embeddings for multiple images.
473 Args:
474 image_urls: List of image URLs
475 concurrency: Maximum concurrent requests
477 Returns:
478 Dict mapping URL to embedding vector (or None if failed)
479 """
480 if not settings.siglip_enabled:
481 return {url: None for url in image_urls}
483 semaphore = asyncio.Semaphore(concurrency)
485 async def embed_with_semaphore(url: str) -> tuple[str, Optional[list[float]]]:
486 async with semaphore:
487 embedding = await get_visual_embedding(url)
488 return url, embedding
490 tasks = [embed_with_semaphore(url) for url in image_urls]
491 results = await asyncio.gather(*tasks)
493 return dict(results)
496async def get_siglip_text_embedding(query: str) -> Optional[list[float]]:
497 """Get SigLIP text embedding for a search query.
499 This embeds text in the same vector space as images, enabling
500 direct text-to-image similarity search.
502 Args:
503 query: Search query text (e.g., "granite countertops")
505 Returns:
506 1024-dimensional embedding vector, or None if SigLIP disabled/failed
507 """
508 if not settings.siglip_enabled:
509 return None
511 try:
512 async with httpx.AsyncClient(timeout=30.0) as client:
513 response = await client.post(
514 f"{settings.siglip_base_url}/embed/text",
515 json={"text": query},
516 )
517 response.raise_for_status()
518 result = response.json()
519 return result.get("embedding")
521 except Exception as e:
522 logger.error(f"Failed to get SigLIP text embedding: {e}")
523 return None