Coverage for src / idx_api / routers / content_sources.py: 31%
295 statements
« prev ^ index » next coverage.py v7.13.1, created at 2025-12-28 11:09 -0700
« prev ^ index » next coverage.py v7.13.1, created at 2025-12-28 11:09 -0700
1"""Content source management for multi-tenant blog content."""
3import os
4import re
5import subprocess
6import tempfile
7from datetime import datetime, timezone
8from pathlib import Path
10from fastapi import APIRouter, Depends, HTTPException, Query
11from pydantic import BaseModel
12from sqlalchemy import select
13from sqlalchemy.orm import Session
15from idx_api.auth import AdminUser, BrokerUser
16from idx_api.database import get_db
17from idx_api.models.brokerage import Brokerage
18from idx_api.models.brokerage_content_source import BrokerageContentSource
20router = APIRouter()
23# ===== Response Models =====
26class ContentSourceResponse(BaseModel):
27 """Brokerage content source response model."""
29 id: int
30 brokerage_id: int
31 git_url: str
32 git_branch: str
33 content_path: str | None
34 auth_method: str
35 enabled: bool
36 last_synced_at: datetime | None
37 last_sync_commit: str | None
38 last_sync_error: str | None
39 created_at: datetime
40 updated_at: datetime
41 # Include brokerage info for list endpoint
42 brokerage_slug: str | None = None
43 brokerage_name: str | None = None
45 class Config:
46 from_attributes = True
49class ContentSourceBuildResponse(BaseModel):
50 """
51 Content source response for build-time sync.
53 Includes credentials - only accessible via API key authentication.
54 """
56 id: int
57 brokerage_id: int
58 brokerage_slug: str
59 brokerage_name: str
60 git_url: str
61 git_branch: str
62 content_path: str | None
63 auth_method: str
64 auth_credential: str | None # Included for build-time cloning
65 enabled: bool
66 last_synced_at: datetime | None
67 last_sync_commit: str | None
69 class Config:
70 from_attributes = True
73class ContentSourceCreate(BaseModel):
74 """Content source creation/update request."""
76 git_url: str
77 git_branch: str = "main"
78 content_path: str | None = None
79 auth_method: str = "none" # "none", "token", "ssh_key", "deploy_key"
80 auth_credential: str | None = None # Token or key reference (stored encrypted)
81 enabled: bool = True
84class ContentSourceUpdate(BaseModel):
85 """Content source update request (partial)."""
87 git_url: str | None = None
88 git_branch: str | None = None
89 content_path: str | None = None
90 auth_method: str | None = None
91 auth_credential: str | None = None
92 enabled: bool | None = None
95class SyncedPost(BaseModel):
96 """Information about a synced blog post."""
98 filename: str
99 title: str | None = None
100 has_required_frontmatter: bool = True
101 warnings: list[str] = []
104class ContentSourceSyncResult(BaseModel):
105 """Result from a content sync operation."""
107 success: bool
108 brokerage_slug: str
109 commit: str | None
110 message: str
111 synced_at: datetime | None
112 # Detailed sync information
113 posts_found: list[SyncedPost] = []
114 assets_found: list[str] = []
115 warnings: list[str] = []
116 errors: list[str] = []
119# ===== Content Sync Helpers =====
122def parse_mdx_frontmatter(content: str) -> dict:
123 """
124 Parse YAML frontmatter from MDX content.
126 Returns a dict with parsed fields, or empty dict if no frontmatter found.
127 """
128 # Match --- delimited frontmatter at start of file
129 match = re.match(r"^---\s*\n(.*?)\n---\s*\n", content, re.DOTALL)
130 if not match:
131 return {}
133 frontmatter_text = match.group(1)
134 result = {}
136 # Simple YAML parsing for common fields
137 for line in frontmatter_text.split("\n"):
138 line = line.strip()
139 if not line or line.startswith("#"):
140 continue
142 # Handle key: value pairs
143 if ":" in line:
144 key, _, value = line.partition(":")
145 key = key.strip()
146 value = value.strip().strip('"').strip("'")
147 if value:
148 result[key] = value
150 return result
153def validate_mdx_post(filepath: Path, content: str) -> SyncedPost:
154 """
155 Validate an MDX blog post and return sync information.
157 Checks for required frontmatter fields and returns warnings for issues.
158 """
159 warnings = []
160 frontmatter = parse_mdx_frontmatter(content)
162 # Check required fields
163 required_fields = ["title", "pubDate"]
164 recommended_fields = ["description", "author", "category"]
166 title = frontmatter.get("title")
167 has_required = True
169 for field in required_fields:
170 if field not in frontmatter:
171 warnings.append(f"Missing required field: {field}")
172 has_required = False
174 for field in recommended_fields:
175 if field not in frontmatter:
176 warnings.append(f"Missing recommended field: {field}")
178 # Check for heroImage
179 if "heroImage" not in frontmatter:
180 warnings.append("No heroImage specified - will use default")
182 return SyncedPost(
183 filename=filepath.name,
184 title=title,
185 has_required_frontmatter=has_required,
186 warnings=warnings,
187 )
190def normalize_ssh_key(key: str) -> str:
191 """
192 Normalize an SSH private key to ensure proper format.
194 OpenSSH keys require:
195 - Header on its own line
196 - Base64 content (can be one line or wrapped)
197 - Footer on its own line
198 - Trailing newline
200 This fixes keys that were pasted incorrectly (e.g., all on one line).
201 """
202 # Strip whitespace
203 key = key.strip()
205 # Handle the case where everything is on one line
206 # Match patterns like: -----BEGIN OPENSSH PRIVATE KEY----- base64... -----END OPENSSH PRIVATE KEY-----
207 openssh_match = re.match(
208 r"(-----BEGIN OPENSSH PRIVATE KEY-----)\s*(.+?)\s*(-----END OPENSSH PRIVATE KEY-----)",
209 key,
210 re.DOTALL,
211 )
213 if openssh_match:
214 header, content, footer = openssh_match.groups()
215 # Clean up the base64 content (remove any embedded newlines/spaces)
216 content = re.sub(r"\s+", "", content)
217 # Reconstruct with proper line breaks
218 return f"{header}\n{content}\n{footer}\n"
220 # Also handle RSA keys
221 rsa_match = re.match(
222 r"(-----BEGIN RSA PRIVATE KEY-----)\s*(.+?)\s*(-----END RSA PRIVATE KEY-----)",
223 key,
224 re.DOTALL,
225 )
227 if rsa_match:
228 header, content, footer = rsa_match.groups()
229 content = re.sub(r"\s+", "", content)
230 return f"{header}\n{content}\n{footer}\n"
232 # If no match, ensure trailing newline at minimum
233 return key + "\n" if not key.endswith("\n") else key
236def sync_git_repository(
237 git_url: str,
238 git_branch: str,
239 content_path: str | None,
240 auth_method: str,
241 auth_credential: str | None,
242 brokerage_slug: str,
243) -> tuple[bool, str | None, list[SyncedPost], list[str], list[str], list[str]]:
244 """
245 Clone/pull a git repository and analyze its content.
247 Returns:
248 (success, commit_sha, posts_found, assets_found, warnings, errors)
249 """
250 posts_found: list[SyncedPost] = []
251 assets_found: list[str] = []
252 warnings: list[str] = []
253 errors: list[str] = []
254 commit_sha: str | None = None
256 # Create temp directory for cloning
257 with tempfile.TemporaryDirectory(prefix="content_sync_") as tmp_dir:
258 repo_dir = Path(tmp_dir) / "repo"
260 try:
261 # Build git clone command
262 env = os.environ.copy()
263 clone_url = git_url
265 if auth_method == "ssh_key" and auth_credential:
266 # Write SSH key to temp file, normalizing format
267 ssh_key_path = Path(tmp_dir) / "deploy_key"
268 ssh_key_path.write_text(normalize_ssh_key(auth_credential))
269 ssh_key_path.chmod(0o600)
271 # Configure SSH to use the key
272 env["GIT_SSH_COMMAND"] = f"ssh -i {ssh_key_path} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"
274 elif auth_method == "token" and auth_credential:
275 # Insert token into HTTPS URL
276 if git_url.startswith("https://"):
277 # https://github.com/... -> https://token@github.com/...
278 clone_url = git_url.replace("https://", f"https://{auth_credential}@")
280 # Clone the repository
281 result = subprocess.run(
282 ["git", "clone", "--depth", "1", "--branch", git_branch, clone_url, str(repo_dir)],
283 capture_output=True,
284 text=True,
285 timeout=60,
286 env=env,
287 )
289 if result.returncode != 0:
290 error_msg = result.stderr.strip() or result.stdout.strip() or "Git clone failed"
291 # Sanitize error message to remove credentials
292 error_msg = re.sub(r"https://[^@]+@", "https://***@", error_msg)
293 errors.append(f"Clone failed: {error_msg}")
294 return False, None, posts_found, assets_found, warnings, errors
296 # Get commit SHA
297 result = subprocess.run(
298 ["git", "rev-parse", "HEAD"],
299 capture_output=True,
300 text=True,
301 cwd=repo_dir,
302 )
303 if result.returncode == 0:
304 commit_sha = result.stdout.strip()[:40]
306 # Determine content directory
307 if content_path:
308 content_dir = repo_dir / content_path
309 else:
310 # Try common paths
311 for try_path in ["posts", "content/posts", "content", ""]:
312 test_dir = repo_dir / try_path if try_path else repo_dir
313 if test_dir.exists() and any(test_dir.glob("*.mdx")):
314 content_dir = test_dir
315 if try_path:
316 warnings.append(f"Auto-detected content path: {try_path}")
317 break
318 else:
319 content_dir = repo_dir
321 # Find and validate MDX files
322 if content_dir.exists():
323 # Use set to deduplicate (*.mdx overlaps with **/*.mdx in root)
324 mdx_files = set(content_dir.glob("*.mdx")) | set(content_dir.glob("**/*.mdx"))
326 for mdx_path in sorted(mdx_files, key=lambda p: p.name):
327 try:
328 content = mdx_path.read_text(encoding="utf-8")
329 post_info = validate_mdx_post(mdx_path, content)
330 posts_found.append(post_info)
331 except Exception as e:
332 errors.append(f"Failed to read {mdx_path.name}: {str(e)}")
333 else:
334 warnings.append(f"Content path not found: {content_path or 'root'}")
336 # Find assets
337 assets_dir = repo_dir / "assets" / "images"
338 if not assets_dir.exists():
339 assets_dir = repo_dir / "images"
341 if assets_dir.exists():
342 for asset in assets_dir.rglob("*"):
343 if asset.is_file() and asset.suffix.lower() in [".jpg", ".jpeg", ".png", ".gif", ".webp", ".svg"]:
344 assets_found.append(asset.name)
345 else:
346 # Check repo root for images
347 for asset in repo_dir.rglob("*.{jpg,jpeg,png,gif,webp,svg}"):
348 if asset.is_file():
349 assets_found.append(asset.relative_to(repo_dir).as_posix())
351 if not posts_found:
352 warnings.append("No MDX posts found in repository")
354 return True, commit_sha, posts_found, assets_found, warnings, errors
356 except subprocess.TimeoutExpired:
357 errors.append("Git operation timed out after 60 seconds")
358 return False, None, posts_found, assets_found, warnings, errors
359 except Exception as e:
360 errors.append(f"Sync error: {str(e)}")
361 return False, None, posts_found, assets_found, warnings, errors
364# ===== Content Source CRUD Endpoints =====
367@router.get("/content-sources")
368async def list_content_sources(
369 user: AdminUser,
370 db: Session = Depends(get_db),
371 enabled_only: bool = Query(False, description="Only return enabled sources"),
372 include_credentials: bool = Query(
373 False,
374 description="Include auth credentials for build-time sync (API key auth required)",
375 ),
376):
377 """
378 List all content sources across all brokerages.
380 This endpoint is primarily used by the build-time sync script to
381 fetch all repositories that need to be cloned/pulled.
383 Set include_credentials=true to get auth credentials for cloning.
384 Credentials are only returned with API key authentication for security.
386 Requires admin role.
387 """
388 query = (
389 select(BrokerageContentSource, Brokerage.slug, Brokerage.name)
390 .join(Brokerage, BrokerageContentSource.brokerage_id == Brokerage.id)
391 .where(Brokerage.disabled_at.is_(None))
392 )
394 if enabled_only:
395 query = query.where(BrokerageContentSource.enabled == True)
397 results = db.execute(query).all()
399 items = []
400 for source, slug, name in results:
401 if include_credentials:
402 # Build response includes credentials for cloning
403 items.append(
404 ContentSourceBuildResponse(
405 id=source.id,
406 brokerage_id=source.brokerage_id,
407 brokerage_slug=slug,
408 brokerage_name=name,
409 git_url=source.git_url,
410 git_branch=source.git_branch,
411 content_path=source.content_path,
412 auth_method=source.auth_method,
413 auth_credential=source.auth_credential,
414 enabled=source.enabled,
415 last_synced_at=source.last_synced_at,
416 last_sync_commit=source.last_sync_commit,
417 )
418 )
419 else:
420 response = ContentSourceResponse.model_validate(source)
421 response.brokerage_slug = slug
422 response.brokerage_name = name
423 items.append(response)
425 return {
426 "items": items,
427 "total": len(items),
428 }
431@router.get("/brokerages/{brokerage_id}/content-source", response_model=ContentSourceResponse | None)
432async def get_brokerage_content_source(
433 brokerage_id: int,
434 user: BrokerUser,
435 db: Session = Depends(get_db),
436):
437 """
438 Get the content source configuration for a specific brokerage.
440 Returns null if no content source is configured.
442 Authorization:
443 - Admins can view any brokerage's content source
444 - Brokers can only view their own brokerage's content source
445 """
446 # Authorization check for non-admins
447 if user.role != "admin" and user.brokerage_id != brokerage_id:
448 raise HTTPException(status_code=403, detail="Cannot access another brokerage's content source")
450 # Verify brokerage exists
451 brokerage = db.get(Brokerage, brokerage_id)
452 if not brokerage or brokerage.disabled_at:
453 raise HTTPException(status_code=404, detail="Brokerage not found")
455 # Get content source
456 source = db.scalar(
457 select(BrokerageContentSource).where(BrokerageContentSource.brokerage_id == brokerage_id)
458 )
460 if not source:
461 return None
463 response = ContentSourceResponse.model_validate(source)
464 response.brokerage_slug = brokerage.slug
465 response.brokerage_name = brokerage.name
466 return response
469@router.put("/brokerages/{brokerage_id}/content-source", response_model=ContentSourceResponse)
470async def upsert_brokerage_content_source(
471 brokerage_id: int,
472 data: ContentSourceCreate,
473 user: AdminUser,
474 db: Session = Depends(get_db),
475):
476 """
477 Create or update the content source for a brokerage.
479 This is an upsert operation - if no content source exists, one is created.
480 If one already exists, it is updated.
482 Requires admin role.
483 """
484 # Verify brokerage exists
485 brokerage = db.get(Brokerage, brokerage_id)
486 if not brokerage or brokerage.disabled_at:
487 raise HTTPException(status_code=404, detail="Brokerage not found")
489 # Check for existing content source
490 source = db.scalar(
491 select(BrokerageContentSource).where(BrokerageContentSource.brokerage_id == brokerage_id)
492 )
494 if source:
495 # Update existing
496 for field, value in data.model_dump(exclude_unset=True).items():
497 setattr(source, field, value)
498 source.updated_at = datetime.utcnow()
499 else:
500 # Create new
501 source = BrokerageContentSource(
502 brokerage_id=brokerage_id,
503 **data.model_dump(),
504 created_at=datetime.utcnow(),
505 updated_at=datetime.utcnow(),
506 )
507 db.add(source)
509 db.commit()
510 db.refresh(source)
512 response = ContentSourceResponse.model_validate(source)
513 response.brokerage_slug = brokerage.slug
514 response.brokerage_name = brokerage.name
515 return response
518@router.patch("/brokerages/{brokerage_id}/content-source", response_model=ContentSourceResponse)
519async def update_brokerage_content_source(
520 brokerage_id: int,
521 data: ContentSourceUpdate,
522 user: AdminUser,
523 db: Session = Depends(get_db),
524):
525 """
526 Partially update the content source for a brokerage.
528 Only provided fields are updated. Requires an existing content source.
530 Requires admin role.
531 """
532 # Verify brokerage exists
533 brokerage = db.get(Brokerage, brokerage_id)
534 if not brokerage or brokerage.disabled_at:
535 raise HTTPException(status_code=404, detail="Brokerage not found")
537 # Get existing content source
538 source = db.scalar(
539 select(BrokerageContentSource).where(BrokerageContentSource.brokerage_id == brokerage_id)
540 )
542 if not source:
543 raise HTTPException(status_code=404, detail="No content source configured for this brokerage")
545 # Update only provided fields
546 update_data = data.model_dump(exclude_unset=True)
547 for field, value in update_data.items():
548 setattr(source, field, value)
550 source.updated_at = datetime.utcnow()
551 db.commit()
552 db.refresh(source)
554 response = ContentSourceResponse.model_validate(source)
555 response.brokerage_slug = brokerage.slug
556 response.brokerage_name = brokerage.name
557 return response
560@router.delete("/brokerages/{brokerage_id}/content-source")
561async def delete_brokerage_content_source(
562 brokerage_id: int,
563 user: AdminUser,
564 db: Session = Depends(get_db),
565):
566 """
567 Remove the content source configuration for a brokerage.
569 This does not delete any synced content - that remains in the build cache
570 until the next build cleans it up.
572 Requires admin role.
573 """
574 # Verify brokerage exists
575 brokerage = db.get(Brokerage, brokerage_id)
576 if not brokerage or brokerage.disabled_at:
577 raise HTTPException(status_code=404, detail="Brokerage not found")
579 # Get existing content source
580 source = db.scalar(
581 select(BrokerageContentSource).where(BrokerageContentSource.brokerage_id == brokerage_id)
582 )
584 if not source:
585 raise HTTPException(status_code=404, detail="No content source configured for this brokerage")
587 db.delete(source)
588 db.commit()
590 return {"message": f"Content source removed for brokerage '{brokerage.slug}'"}
593@router.post("/brokerages/{brokerage_id}/content-source/sync", response_model=ContentSourceSyncResult)
594async def trigger_content_source_sync(
595 brokerage_id: int,
596 user: AdminUser,
597 db: Session = Depends(get_db),
598):
599 """
600 Trigger a manual sync for a brokerage's content source.
602 This endpoint is for testing/debugging. In production, content is synced
603 at build time by the sync script.
605 The sync process:
606 1. Clones or pulls the git repository
607 2. Copies content to the appropriate tenant directory
608 3. Updates the last_synced_at and last_sync_commit fields
610 Requires admin role.
611 """
612 # Verify brokerage exists
613 brokerage = db.get(Brokerage, brokerage_id)
614 if not brokerage or brokerage.disabled_at:
615 raise HTTPException(status_code=404, detail="Brokerage not found")
617 # Get content source
618 source = db.scalar(
619 select(BrokerageContentSource).where(BrokerageContentSource.brokerage_id == brokerage_id)
620 )
622 if not source:
623 raise HTTPException(status_code=404, detail="No content source configured for this brokerage")
625 if not source.enabled:
626 return ContentSourceSyncResult(
627 success=False,
628 brokerage_slug=brokerage.slug,
629 commit=None,
630 message="Content source is disabled",
631 synced_at=None,
632 )
634 # Perform the actual git sync
635 success, commit_sha, posts_found, assets_found, warnings, errors = sync_git_repository(
636 git_url=source.git_url,
637 git_branch=source.git_branch,
638 content_path=source.content_path,
639 auth_method=source.auth_method,
640 auth_credential=source.auth_credential,
641 brokerage_slug=brokerage.slug,
642 )
644 now = datetime.now(timezone.utc)
646 if success:
647 # Update the content source with sync results
648 source.last_synced_at = now
649 source.last_sync_commit = commit_sha
650 source.last_sync_error = None
651 db.commit()
653 # Build success message
654 post_count = len(posts_found)
655 valid_count = sum(1 for p in posts_found if p.has_required_frontmatter)
656 message = f"Synced {post_count} posts"
657 if valid_count < post_count:
658 message += f" ({valid_count} valid, {post_count - valid_count} with issues)"
659 if assets_found:
660 message += f", {len(assets_found)} assets"
662 return ContentSourceSyncResult(
663 success=True,
664 brokerage_slug=brokerage.slug,
665 commit=commit_sha,
666 message=message,
667 synced_at=now,
668 posts_found=posts_found,
669 assets_found=assets_found,
670 warnings=warnings,
671 errors=errors,
672 )
673 else:
674 # Update with error info
675 error_message = "; ".join(errors) if errors else "Unknown sync error"
676 source.last_sync_error = error_message
677 db.commit()
679 return ContentSourceSyncResult(
680 success=False,
681 brokerage_slug=brokerage.slug,
682 commit=None,
683 message=f"Sync failed: {error_message}",
684 synced_at=None,
685 posts_found=posts_found,
686 assets_found=assets_found,
687 warnings=warnings,
688 errors=errors,
689 )