Coverage for src / idx_api / routers / content_sources.py: 31%

295 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2025-12-28 11:09 -0700

1"""Content source management for multi-tenant blog content.""" 

2 

3import os 

4import re 

5import subprocess 

6import tempfile 

7from datetime import datetime, timezone 

8from pathlib import Path 

9 

10from fastapi import APIRouter, Depends, HTTPException, Query 

11from pydantic import BaseModel 

12from sqlalchemy import select 

13from sqlalchemy.orm import Session 

14 

15from idx_api.auth import AdminUser, BrokerUser 

16from idx_api.database import get_db 

17from idx_api.models.brokerage import Brokerage 

18from idx_api.models.brokerage_content_source import BrokerageContentSource 

19 

20router = APIRouter() 

21 

22 

23# ===== Response Models ===== 

24 

25 

26class ContentSourceResponse(BaseModel): 

27 """Brokerage content source response model.""" 

28 

29 id: int 

30 brokerage_id: int 

31 git_url: str 

32 git_branch: str 

33 content_path: str | None 

34 auth_method: str 

35 enabled: bool 

36 last_synced_at: datetime | None 

37 last_sync_commit: str | None 

38 last_sync_error: str | None 

39 created_at: datetime 

40 updated_at: datetime 

41 # Include brokerage info for list endpoint 

42 brokerage_slug: str | None = None 

43 brokerage_name: str | None = None 

44 

45 class Config: 

46 from_attributes = True 

47 

48 

49class ContentSourceBuildResponse(BaseModel): 

50 """ 

51 Content source response for build-time sync. 

52 

53 Includes credentials - only accessible via API key authentication. 

54 """ 

55 

56 id: int 

57 brokerage_id: int 

58 brokerage_slug: str 

59 brokerage_name: str 

60 git_url: str 

61 git_branch: str 

62 content_path: str | None 

63 auth_method: str 

64 auth_credential: str | None # Included for build-time cloning 

65 enabled: bool 

66 last_synced_at: datetime | None 

67 last_sync_commit: str | None 

68 

69 class Config: 

70 from_attributes = True 

71 

72 

73class ContentSourceCreate(BaseModel): 

74 """Content source creation/update request.""" 

75 

76 git_url: str 

77 git_branch: str = "main" 

78 content_path: str | None = None 

79 auth_method: str = "none" # "none", "token", "ssh_key", "deploy_key" 

80 auth_credential: str | None = None # Token or key reference (stored encrypted) 

81 enabled: bool = True 

82 

83 

84class ContentSourceUpdate(BaseModel): 

85 """Content source update request (partial).""" 

86 

87 git_url: str | None = None 

88 git_branch: str | None = None 

89 content_path: str | None = None 

90 auth_method: str | None = None 

91 auth_credential: str | None = None 

92 enabled: bool | None = None 

93 

94 

95class SyncedPost(BaseModel): 

96 """Information about a synced blog post.""" 

97 

98 filename: str 

99 title: str | None = None 

100 has_required_frontmatter: bool = True 

101 warnings: list[str] = [] 

102 

103 

104class ContentSourceSyncResult(BaseModel): 

105 """Result from a content sync operation.""" 

106 

107 success: bool 

108 brokerage_slug: str 

109 commit: str | None 

110 message: str 

111 synced_at: datetime | None 

112 # Detailed sync information 

113 posts_found: list[SyncedPost] = [] 

114 assets_found: list[str] = [] 

115 warnings: list[str] = [] 

116 errors: list[str] = [] 

117 

118 

119# ===== Content Sync Helpers ===== 

120 

121 

122def parse_mdx_frontmatter(content: str) -> dict: 

123 """ 

124 Parse YAML frontmatter from MDX content. 

125 

126 Returns a dict with parsed fields, or empty dict if no frontmatter found. 

127 """ 

128 # Match --- delimited frontmatter at start of file 

129 match = re.match(r"^---\s*\n(.*?)\n---\s*\n", content, re.DOTALL) 

130 if not match: 

131 return {} 

132 

133 frontmatter_text = match.group(1) 

134 result = {} 

135 

136 # Simple YAML parsing for common fields 

137 for line in frontmatter_text.split("\n"): 

138 line = line.strip() 

139 if not line or line.startswith("#"): 

140 continue 

141 

142 # Handle key: value pairs 

143 if ":" in line: 

144 key, _, value = line.partition(":") 

145 key = key.strip() 

146 value = value.strip().strip('"').strip("'") 

147 if value: 

148 result[key] = value 

149 

150 return result 

151 

152 

153def validate_mdx_post(filepath: Path, content: str) -> SyncedPost: 

154 """ 

155 Validate an MDX blog post and return sync information. 

156 

157 Checks for required frontmatter fields and returns warnings for issues. 

158 """ 

159 warnings = [] 

160 frontmatter = parse_mdx_frontmatter(content) 

161 

162 # Check required fields 

163 required_fields = ["title", "pubDate"] 

164 recommended_fields = ["description", "author", "category"] 

165 

166 title = frontmatter.get("title") 

167 has_required = True 

168 

169 for field in required_fields: 

170 if field not in frontmatter: 

171 warnings.append(f"Missing required field: {field}") 

172 has_required = False 

173 

174 for field in recommended_fields: 

175 if field not in frontmatter: 

176 warnings.append(f"Missing recommended field: {field}") 

177 

178 # Check for heroImage 

179 if "heroImage" not in frontmatter: 

180 warnings.append("No heroImage specified - will use default") 

181 

182 return SyncedPost( 

183 filename=filepath.name, 

184 title=title, 

185 has_required_frontmatter=has_required, 

186 warnings=warnings, 

187 ) 

188 

189 

190def normalize_ssh_key(key: str) -> str: 

191 """ 

192 Normalize an SSH private key to ensure proper format. 

193 

194 OpenSSH keys require: 

195 - Header on its own line 

196 - Base64 content (can be one line or wrapped) 

197 - Footer on its own line 

198 - Trailing newline 

199 

200 This fixes keys that were pasted incorrectly (e.g., all on one line). 

201 """ 

202 # Strip whitespace 

203 key = key.strip() 

204 

205 # Handle the case where everything is on one line 

206 # Match patterns like: -----BEGIN OPENSSH PRIVATE KEY----- base64... -----END OPENSSH PRIVATE KEY----- 

207 openssh_match = re.match( 

208 r"(-----BEGIN OPENSSH PRIVATE KEY-----)\s*(.+?)\s*(-----END OPENSSH PRIVATE KEY-----)", 

209 key, 

210 re.DOTALL, 

211 ) 

212 

213 if openssh_match: 

214 header, content, footer = openssh_match.groups() 

215 # Clean up the base64 content (remove any embedded newlines/spaces) 

216 content = re.sub(r"\s+", "", content) 

217 # Reconstruct with proper line breaks 

218 return f"{header}\n{content}\n{footer}\n" 

219 

220 # Also handle RSA keys 

221 rsa_match = re.match( 

222 r"(-----BEGIN RSA PRIVATE KEY-----)\s*(.+?)\s*(-----END RSA PRIVATE KEY-----)", 

223 key, 

224 re.DOTALL, 

225 ) 

226 

227 if rsa_match: 

228 header, content, footer = rsa_match.groups() 

229 content = re.sub(r"\s+", "", content) 

230 return f"{header}\n{content}\n{footer}\n" 

231 

232 # If no match, ensure trailing newline at minimum 

233 return key + "\n" if not key.endswith("\n") else key 

234 

235 

236def sync_git_repository( 

237 git_url: str, 

238 git_branch: str, 

239 content_path: str | None, 

240 auth_method: str, 

241 auth_credential: str | None, 

242 brokerage_slug: str, 

243) -> tuple[bool, str | None, list[SyncedPost], list[str], list[str], list[str]]: 

244 """ 

245 Clone/pull a git repository and analyze its content. 

246 

247 Returns: 

248 (success, commit_sha, posts_found, assets_found, warnings, errors) 

249 """ 

250 posts_found: list[SyncedPost] = [] 

251 assets_found: list[str] = [] 

252 warnings: list[str] = [] 

253 errors: list[str] = [] 

254 commit_sha: str | None = None 

255 

256 # Create temp directory for cloning 

257 with tempfile.TemporaryDirectory(prefix="content_sync_") as tmp_dir: 

258 repo_dir = Path(tmp_dir) / "repo" 

259 

260 try: 

261 # Build git clone command 

262 env = os.environ.copy() 

263 clone_url = git_url 

264 

265 if auth_method == "ssh_key" and auth_credential: 

266 # Write SSH key to temp file, normalizing format 

267 ssh_key_path = Path(tmp_dir) / "deploy_key" 

268 ssh_key_path.write_text(normalize_ssh_key(auth_credential)) 

269 ssh_key_path.chmod(0o600) 

270 

271 # Configure SSH to use the key 

272 env["GIT_SSH_COMMAND"] = f"ssh -i {ssh_key_path} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null" 

273 

274 elif auth_method == "token" and auth_credential: 

275 # Insert token into HTTPS URL 

276 if git_url.startswith("https://"): 

277 # https://github.com/... -> https://token@github.com/... 

278 clone_url = git_url.replace("https://", f"https://{auth_credential}@") 

279 

280 # Clone the repository 

281 result = subprocess.run( 

282 ["git", "clone", "--depth", "1", "--branch", git_branch, clone_url, str(repo_dir)], 

283 capture_output=True, 

284 text=True, 

285 timeout=60, 

286 env=env, 

287 ) 

288 

289 if result.returncode != 0: 

290 error_msg = result.stderr.strip() or result.stdout.strip() or "Git clone failed" 

291 # Sanitize error message to remove credentials 

292 error_msg = re.sub(r"https://[^@]+@", "https://***@", error_msg) 

293 errors.append(f"Clone failed: {error_msg}") 

294 return False, None, posts_found, assets_found, warnings, errors 

295 

296 # Get commit SHA 

297 result = subprocess.run( 

298 ["git", "rev-parse", "HEAD"], 

299 capture_output=True, 

300 text=True, 

301 cwd=repo_dir, 

302 ) 

303 if result.returncode == 0: 

304 commit_sha = result.stdout.strip()[:40] 

305 

306 # Determine content directory 

307 if content_path: 

308 content_dir = repo_dir / content_path 

309 else: 

310 # Try common paths 

311 for try_path in ["posts", "content/posts", "content", ""]: 

312 test_dir = repo_dir / try_path if try_path else repo_dir 

313 if test_dir.exists() and any(test_dir.glob("*.mdx")): 

314 content_dir = test_dir 

315 if try_path: 

316 warnings.append(f"Auto-detected content path: {try_path}") 

317 break 

318 else: 

319 content_dir = repo_dir 

320 

321 # Find and validate MDX files 

322 if content_dir.exists(): 

323 # Use set to deduplicate (*.mdx overlaps with **/*.mdx in root) 

324 mdx_files = set(content_dir.glob("*.mdx")) | set(content_dir.glob("**/*.mdx")) 

325 

326 for mdx_path in sorted(mdx_files, key=lambda p: p.name): 

327 try: 

328 content = mdx_path.read_text(encoding="utf-8") 

329 post_info = validate_mdx_post(mdx_path, content) 

330 posts_found.append(post_info) 

331 except Exception as e: 

332 errors.append(f"Failed to read {mdx_path.name}: {str(e)}") 

333 else: 

334 warnings.append(f"Content path not found: {content_path or 'root'}") 

335 

336 # Find assets 

337 assets_dir = repo_dir / "assets" / "images" 

338 if not assets_dir.exists(): 

339 assets_dir = repo_dir / "images" 

340 

341 if assets_dir.exists(): 

342 for asset in assets_dir.rglob("*"): 

343 if asset.is_file() and asset.suffix.lower() in [".jpg", ".jpeg", ".png", ".gif", ".webp", ".svg"]: 

344 assets_found.append(asset.name) 

345 else: 

346 # Check repo root for images 

347 for asset in repo_dir.rglob("*.{jpg,jpeg,png,gif,webp,svg}"): 

348 if asset.is_file(): 

349 assets_found.append(asset.relative_to(repo_dir).as_posix()) 

350 

351 if not posts_found: 

352 warnings.append("No MDX posts found in repository") 

353 

354 return True, commit_sha, posts_found, assets_found, warnings, errors 

355 

356 except subprocess.TimeoutExpired: 

357 errors.append("Git operation timed out after 60 seconds") 

358 return False, None, posts_found, assets_found, warnings, errors 

359 except Exception as e: 

360 errors.append(f"Sync error: {str(e)}") 

361 return False, None, posts_found, assets_found, warnings, errors 

362 

363 

364# ===== Content Source CRUD Endpoints ===== 

365 

366 

367@router.get("/content-sources") 

368async def list_content_sources( 

369 user: AdminUser, 

370 db: Session = Depends(get_db), 

371 enabled_only: bool = Query(False, description="Only return enabled sources"), 

372 include_credentials: bool = Query( 

373 False, 

374 description="Include auth credentials for build-time sync (API key auth required)", 

375 ), 

376): 

377 """ 

378 List all content sources across all brokerages. 

379 

380 This endpoint is primarily used by the build-time sync script to 

381 fetch all repositories that need to be cloned/pulled. 

382 

383 Set include_credentials=true to get auth credentials for cloning. 

384 Credentials are only returned with API key authentication for security. 

385 

386 Requires admin role. 

387 """ 

388 query = ( 

389 select(BrokerageContentSource, Brokerage.slug, Brokerage.name) 

390 .join(Brokerage, BrokerageContentSource.brokerage_id == Brokerage.id) 

391 .where(Brokerage.disabled_at.is_(None)) 

392 ) 

393 

394 if enabled_only: 

395 query = query.where(BrokerageContentSource.enabled == True) 

396 

397 results = db.execute(query).all() 

398 

399 items = [] 

400 for source, slug, name in results: 

401 if include_credentials: 

402 # Build response includes credentials for cloning 

403 items.append( 

404 ContentSourceBuildResponse( 

405 id=source.id, 

406 brokerage_id=source.brokerage_id, 

407 brokerage_slug=slug, 

408 brokerage_name=name, 

409 git_url=source.git_url, 

410 git_branch=source.git_branch, 

411 content_path=source.content_path, 

412 auth_method=source.auth_method, 

413 auth_credential=source.auth_credential, 

414 enabled=source.enabled, 

415 last_synced_at=source.last_synced_at, 

416 last_sync_commit=source.last_sync_commit, 

417 ) 

418 ) 

419 else: 

420 response = ContentSourceResponse.model_validate(source) 

421 response.brokerage_slug = slug 

422 response.brokerage_name = name 

423 items.append(response) 

424 

425 return { 

426 "items": items, 

427 "total": len(items), 

428 } 

429 

430 

431@router.get("/brokerages/{brokerage_id}/content-source", response_model=ContentSourceResponse | None) 

432async def get_brokerage_content_source( 

433 brokerage_id: int, 

434 user: BrokerUser, 

435 db: Session = Depends(get_db), 

436): 

437 """ 

438 Get the content source configuration for a specific brokerage. 

439 

440 Returns null if no content source is configured. 

441 

442 Authorization: 

443 - Admins can view any brokerage's content source 

444 - Brokers can only view their own brokerage's content source 

445 """ 

446 # Authorization check for non-admins 

447 if user.role != "admin" and user.brokerage_id != brokerage_id: 

448 raise HTTPException(status_code=403, detail="Cannot access another brokerage's content source") 

449 

450 # Verify brokerage exists 

451 brokerage = db.get(Brokerage, brokerage_id) 

452 if not brokerage or brokerage.disabled_at: 

453 raise HTTPException(status_code=404, detail="Brokerage not found") 

454 

455 # Get content source 

456 source = db.scalar( 

457 select(BrokerageContentSource).where(BrokerageContentSource.brokerage_id == brokerage_id) 

458 ) 

459 

460 if not source: 

461 return None 

462 

463 response = ContentSourceResponse.model_validate(source) 

464 response.brokerage_slug = brokerage.slug 

465 response.brokerage_name = brokerage.name 

466 return response 

467 

468 

469@router.put("/brokerages/{brokerage_id}/content-source", response_model=ContentSourceResponse) 

470async def upsert_brokerage_content_source( 

471 brokerage_id: int, 

472 data: ContentSourceCreate, 

473 user: AdminUser, 

474 db: Session = Depends(get_db), 

475): 

476 """ 

477 Create or update the content source for a brokerage. 

478 

479 This is an upsert operation - if no content source exists, one is created. 

480 If one already exists, it is updated. 

481 

482 Requires admin role. 

483 """ 

484 # Verify brokerage exists 

485 brokerage = db.get(Brokerage, brokerage_id) 

486 if not brokerage or brokerage.disabled_at: 

487 raise HTTPException(status_code=404, detail="Brokerage not found") 

488 

489 # Check for existing content source 

490 source = db.scalar( 

491 select(BrokerageContentSource).where(BrokerageContentSource.brokerage_id == brokerage_id) 

492 ) 

493 

494 if source: 

495 # Update existing 

496 for field, value in data.model_dump(exclude_unset=True).items(): 

497 setattr(source, field, value) 

498 source.updated_at = datetime.utcnow() 

499 else: 

500 # Create new 

501 source = BrokerageContentSource( 

502 brokerage_id=brokerage_id, 

503 **data.model_dump(), 

504 created_at=datetime.utcnow(), 

505 updated_at=datetime.utcnow(), 

506 ) 

507 db.add(source) 

508 

509 db.commit() 

510 db.refresh(source) 

511 

512 response = ContentSourceResponse.model_validate(source) 

513 response.brokerage_slug = brokerage.slug 

514 response.brokerage_name = brokerage.name 

515 return response 

516 

517 

518@router.patch("/brokerages/{brokerage_id}/content-source", response_model=ContentSourceResponse) 

519async def update_brokerage_content_source( 

520 brokerage_id: int, 

521 data: ContentSourceUpdate, 

522 user: AdminUser, 

523 db: Session = Depends(get_db), 

524): 

525 """ 

526 Partially update the content source for a brokerage. 

527 

528 Only provided fields are updated. Requires an existing content source. 

529 

530 Requires admin role. 

531 """ 

532 # Verify brokerage exists 

533 brokerage = db.get(Brokerage, brokerage_id) 

534 if not brokerage or brokerage.disabled_at: 

535 raise HTTPException(status_code=404, detail="Brokerage not found") 

536 

537 # Get existing content source 

538 source = db.scalar( 

539 select(BrokerageContentSource).where(BrokerageContentSource.brokerage_id == brokerage_id) 

540 ) 

541 

542 if not source: 

543 raise HTTPException(status_code=404, detail="No content source configured for this brokerage") 

544 

545 # Update only provided fields 

546 update_data = data.model_dump(exclude_unset=True) 

547 for field, value in update_data.items(): 

548 setattr(source, field, value) 

549 

550 source.updated_at = datetime.utcnow() 

551 db.commit() 

552 db.refresh(source) 

553 

554 response = ContentSourceResponse.model_validate(source) 

555 response.brokerage_slug = brokerage.slug 

556 response.brokerage_name = brokerage.name 

557 return response 

558 

559 

560@router.delete("/brokerages/{brokerage_id}/content-source") 

561async def delete_brokerage_content_source( 

562 brokerage_id: int, 

563 user: AdminUser, 

564 db: Session = Depends(get_db), 

565): 

566 """ 

567 Remove the content source configuration for a brokerage. 

568 

569 This does not delete any synced content - that remains in the build cache 

570 until the next build cleans it up. 

571 

572 Requires admin role. 

573 """ 

574 # Verify brokerage exists 

575 brokerage = db.get(Brokerage, brokerage_id) 

576 if not brokerage or brokerage.disabled_at: 

577 raise HTTPException(status_code=404, detail="Brokerage not found") 

578 

579 # Get existing content source 

580 source = db.scalar( 

581 select(BrokerageContentSource).where(BrokerageContentSource.brokerage_id == brokerage_id) 

582 ) 

583 

584 if not source: 

585 raise HTTPException(status_code=404, detail="No content source configured for this brokerage") 

586 

587 db.delete(source) 

588 db.commit() 

589 

590 return {"message": f"Content source removed for brokerage '{brokerage.slug}'"} 

591 

592 

593@router.post("/brokerages/{brokerage_id}/content-source/sync", response_model=ContentSourceSyncResult) 

594async def trigger_content_source_sync( 

595 brokerage_id: int, 

596 user: AdminUser, 

597 db: Session = Depends(get_db), 

598): 

599 """ 

600 Trigger a manual sync for a brokerage's content source. 

601 

602 This endpoint is for testing/debugging. In production, content is synced 

603 at build time by the sync script. 

604 

605 The sync process: 

606 1. Clones or pulls the git repository 

607 2. Copies content to the appropriate tenant directory 

608 3. Updates the last_synced_at and last_sync_commit fields 

609 

610 Requires admin role. 

611 """ 

612 # Verify brokerage exists 

613 brokerage = db.get(Brokerage, brokerage_id) 

614 if not brokerage or brokerage.disabled_at: 

615 raise HTTPException(status_code=404, detail="Brokerage not found") 

616 

617 # Get content source 

618 source = db.scalar( 

619 select(BrokerageContentSource).where(BrokerageContentSource.brokerage_id == brokerage_id) 

620 ) 

621 

622 if not source: 

623 raise HTTPException(status_code=404, detail="No content source configured for this brokerage") 

624 

625 if not source.enabled: 

626 return ContentSourceSyncResult( 

627 success=False, 

628 brokerage_slug=brokerage.slug, 

629 commit=None, 

630 message="Content source is disabled", 

631 synced_at=None, 

632 ) 

633 

634 # Perform the actual git sync 

635 success, commit_sha, posts_found, assets_found, warnings, errors = sync_git_repository( 

636 git_url=source.git_url, 

637 git_branch=source.git_branch, 

638 content_path=source.content_path, 

639 auth_method=source.auth_method, 

640 auth_credential=source.auth_credential, 

641 brokerage_slug=brokerage.slug, 

642 ) 

643 

644 now = datetime.now(timezone.utc) 

645 

646 if success: 

647 # Update the content source with sync results 

648 source.last_synced_at = now 

649 source.last_sync_commit = commit_sha 

650 source.last_sync_error = None 

651 db.commit() 

652 

653 # Build success message 

654 post_count = len(posts_found) 

655 valid_count = sum(1 for p in posts_found if p.has_required_frontmatter) 

656 message = f"Synced {post_count} posts" 

657 if valid_count < post_count: 

658 message += f" ({valid_count} valid, {post_count - valid_count} with issues)" 

659 if assets_found: 

660 message += f", {len(assets_found)} assets" 

661 

662 return ContentSourceSyncResult( 

663 success=True, 

664 brokerage_slug=brokerage.slug, 

665 commit=commit_sha, 

666 message=message, 

667 synced_at=now, 

668 posts_found=posts_found, 

669 assets_found=assets_found, 

670 warnings=warnings, 

671 errors=errors, 

672 ) 

673 else: 

674 # Update with error info 

675 error_message = "; ".join(errors) if errors else "Unknown sync error" 

676 source.last_sync_error = error_message 

677 db.commit() 

678 

679 return ContentSourceSyncResult( 

680 success=False, 

681 brokerage_slug=brokerage.slug, 

682 commit=None, 

683 message=f"Sync failed: {error_message}", 

684 synced_at=None, 

685 posts_found=posts_found, 

686 assets_found=assets_found, 

687 warnings=warnings, 

688 errors=errors, 

689 )