Coverage for src / idx_api / routers / data_management.py: 19%

153 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2025-12-28 11:09 -0700

1"""Data management endpoints for import, export, and statistics.""" 

2 

3import json 

4from datetime import datetime 

5 

6from fastapi import APIRouter, Depends, HTTPException, Query 

7from fastapi.responses import Response 

8from pydantic import BaseModel 

9from sqlalchemy import func 

10from sqlalchemy.orm import Session 

11 

12from idx_api.auth import AdminUser 

13from idx_api.database import get_db 

14from idx_api.models.agent import Agent 

15from idx_api.models.api_key import APIKey 

16from idx_api.models.brokerage import Brokerage 

17from idx_api.models.suggestion import Suggestion 

18from idx_api.models.tour_request import TourRequest 

19 

20router = APIRouter() 

21 

22 

23# ===== Response Models ===== 

24 

25 

26class TableStats(BaseModel): 

27 """Table statistics.""" 

28 

29 table: str 

30 count: int 

31 

32 

33class DataStatsResponse(BaseModel): 

34 """Database statistics response.""" 

35 

36 tables: list[TableStats] 

37 total_records: int 

38 

39 

40# ===== Data Management Endpoints ===== 

41 

42 

43@router.get("/data/stats", response_model=DataStatsResponse) 

44async def get_data_stats( 

45 user: AdminUser, 

46 db: Session = Depends(get_db), 

47): 

48 """Get database table statistics.""" 

49 try: 

50 tables = [] 

51 total = 0 

52 

53 # Count brokers 

54 broker_count = db.query(func.count(Brokerage.id)).scalar() or 0 

55 tables.append(TableStats(table="brokers", count=broker_count)) 

56 total += broker_count 

57 

58 # Count agents 

59 agent_count = db.query(func.count(Agent.id)).scalar() or 0 

60 tables.append(TableStats(table="agents", count=agent_count)) 

61 total += agent_count 

62 

63 # Count tours 

64 tour_count = db.query(func.count(TourRequest.id)).scalar() or 0 

65 tables.append(TableStats(table="tours", count=tour_count)) 

66 total += tour_count 

67 

68 # Count suggestions 

69 suggestion_count = db.query(func.count(Suggestion.id)).scalar() or 0 

70 tables.append(TableStats(table="suggestions", count=suggestion_count)) 

71 total += suggestion_count 

72 

73 # Count API keys 

74 api_key_count = db.query(func.count(APIKey.id)).scalar() or 0 

75 tables.append(TableStats(table="api_keys", count=api_key_count)) 

76 total += api_key_count 

77 

78 return DataStatsResponse( 

79 tables=tables, 

80 total_records=total, 

81 ) 

82 except Exception as e: 

83 raise HTTPException( 

84 status_code=500, 

85 detail=f"Failed to get stats: {str(e)}", 

86 ) 

87 

88 

89@router.get("/data/export") 

90async def export_data( 

91 user: AdminUser, 

92 table: str | None = Query(None, description="Specific table to export (or all if not specified)"), 

93 db: Session = Depends(get_db), 

94): 

95 """Export database data as JSON.""" 

96 try: 

97 data = {} 

98 

99 # Export brokers 

100 if not table or table == "brokers": 

101 brokers = db.query(Brokerage).all() 

102 data["brokers"] = [ 

103 { 

104 "id": b.id, 

105 "slug": b.slug, 

106 "name": b.name, 

107 "tagline": b.tagline, 

108 "email": b.email, 

109 "phone": b.phone, 

110 "military_specialist": b.military_specialist, 

111 "va_loans": b.va_loans, 

112 "franchise_affiliation": b.franchise_affiliation, 

113 "photo_url": b.photo_url, 

114 } 

115 for b in brokers 

116 ] 

117 

118 # Export agents 

119 if not table or table == "agents": 

120 agents = db.query(Agent).all() 

121 data["agents"] = [ 

122 { 

123 "id": a.id, 

124 "broker_id": a.broker_id, 

125 "name": a.name, 

126 "email": a.email, 

127 "phone": a.phone, 

128 "bio": a.bio, 

129 "status": a.status, 

130 "photo_url": a.photo_url, 

131 } 

132 for a in agents 

133 ] 

134 

135 # Export tours 

136 if not table or table == "tours": 

137 tours = db.query(TourRequest).all() 

138 data["tours"] = [ 

139 { 

140 "id": t.id, 

141 "property_id": t.property_id, 

142 "name": t.name, 

143 "email": t.email, 

144 "phone": t.phone, 

145 "message": t.message, 

146 "preferred_date": t.preferred_date.isoformat() if t.preferred_date else None, 

147 "status": t.status, 

148 "created_at": t.created_at.isoformat() if t.created_at else None, 

149 } 

150 for t in tours 

151 ] 

152 

153 # Export suggestions 

154 if not table or table == "suggestions": 

155 suggestions = db.query(Suggestion).all() 

156 data["suggestions"] = [ 

157 { 

158 "id": s.id, 

159 "name": s.name, 

160 "email": s.email, 

161 "suggestion": s.suggestion, 

162 "created_at": s.created_at.isoformat() if s.created_at else None, 

163 } 

164 for s in suggestions 

165 ] 

166 

167 # Export API keys (admin only, sensitive) 

168 if not table or table == "api_keys": 

169 api_keys = db.query(APIKey).all() 

170 data["api_keys"] = [ 

171 { 

172 "id": k.id, 

173 "name": k.name, 

174 "key_hash": k.key_hash, 

175 "role": k.role, 

176 "broker_id": k.broker_id, 

177 "agent_id": k.agent_id, 

178 "created_at": k.created_at.isoformat() if k.created_at else None, 

179 } 

180 for k in api_keys 

181 ] 

182 

183 # Return as downloadable JSON 

184 content = json.dumps(data, indent=2) 

185 filename = f"export_{table or 'all'}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" 

186 

187 return Response( 

188 content=content, 

189 media_type="application/json", 

190 headers={"Content-Disposition": f'attachment; filename="{filename}"'}, 

191 ) 

192 except Exception as e: 

193 raise HTTPException( 

194 status_code=500, 

195 detail=f"Export failed: {str(e)}", 

196 ) 

197 

198 

199@router.post("/data/import") 

200async def import_data( 

201 user: AdminUser, 

202 data: dict, 

203 clear_before_import: bool = False, 

204 db: Session = Depends(get_db), 

205): 

206 """Import database data from JSON.""" 

207 try: 

208 imported = { 

209 "brokers": 0, 

210 "agents": 0, 

211 "tours": 0, 

212 "suggestions": 0, 

213 "api_keys": 0, 

214 } 

215 

216 # Clear tables if requested 

217 if clear_before_import: 

218 if "api_keys" in data: 

219 db.query(APIKey).delete() 

220 if "agents" in data: 

221 db.query(Agent).delete() 

222 if "brokers" in data: 

223 db.query(Brokerage).delete() 

224 if "tours" in data: 

225 db.query(TourRequest).delete() 

226 if "suggestions" in data: 

227 db.query(Suggestion).delete() 

228 db.commit() 

229 

230 # Import brokers 

231 if "brokers" in data: 

232 for broker_data in data["brokers"]: 

233 broker = Brokerage(**{k: v for k, v in broker_data.items() if k != "id"}) 

234 db.add(broker) 

235 imported["brokers"] += 1 

236 db.commit() 

237 

238 # Import agents 

239 if "agents" in data: 

240 for agent_data in data["agents"]: 

241 agent = Agent(**{k: v for k, v in agent_data.items() if k != "id"}) 

242 db.add(agent) 

243 imported["agents"] += 1 

244 db.commit() 

245 

246 # Import tours 

247 if "tours" in data: 

248 for tour_data in data["tours"]: 

249 tour_dict = {k: v for k, v in tour_data.items() if k != "id"} 

250 # Convert ISO strings back to datetime 

251 if tour_dict.get("preferred_date"): 

252 tour_dict["preferred_date"] = datetime.fromisoformat(tour_dict["preferred_date"]) 

253 if tour_dict.get("created_at"): 

254 tour_dict["created_at"] = datetime.fromisoformat(tour_dict["created_at"]) 

255 tour = TourRequest(**tour_dict) 

256 db.add(tour) 

257 imported["tours"] += 1 

258 db.commit() 

259 

260 # Import suggestions 

261 if "suggestions" in data: 

262 for suggestion_data in data["suggestions"]: 

263 suggestion_dict = {k: v for k, v in suggestion_data.items() if k != "id"} 

264 if suggestion_dict.get("created_at"): 

265 suggestion_dict["created_at"] = datetime.fromisoformat(suggestion_dict["created_at"]) 

266 suggestion = Suggestion(**suggestion_dict) 

267 db.add(suggestion) 

268 imported["suggestions"] += 1 

269 db.commit() 

270 

271 # Import API keys 

272 if "api_keys" in data: 

273 for key_data in data["api_keys"]: 

274 key_dict = {k: v for k, v in key_data.items() if k != "id"} 

275 if key_dict.get("created_at"): 

276 key_dict["created_at"] = datetime.fromisoformat(key_dict["created_at"]) 

277 api_key = APIKey(**key_dict) 

278 db.add(api_key) 

279 imported["api_keys"] += 1 

280 db.commit() 

281 

282 return { 

283 "success": True, 

284 "imported": imported, 

285 "message": f"Successfully imported {sum(imported.values())} records", 

286 } 

287 except Exception as e: 

288 db.rollback() 

289 raise HTTPException( 

290 status_code=500, 

291 detail=f"Import failed: {str(e)}", 

292 ) 

293 

294 

295@router.post("/data/clear") 

296async def clear_data( 

297 user: AdminUser, 

298 table: str = Query(..., description="Table to clear (or 'all' to clear everything)"), 

299 db: Session = Depends(get_db), 

300): 

301 """Clear data from specified table.""" 

302 try: 

303 deleted = 0 

304 

305 if table == "all" or table == "api_keys": 

306 count = db.query(APIKey).delete() 

307 deleted += count 

308 

309 if table == "all" or table == "agents": 

310 count = db.query(Agent).delete() 

311 deleted += count 

312 

313 if table == "all" or table == "brokers": 

314 count = db.query(Brokerage).delete() 

315 deleted += count 

316 

317 if table == "all" or table == "tours": 

318 count = db.query(TourRequest).delete() 

319 deleted += count 

320 

321 if table == "all" or table == "suggestions": 

322 count = db.query(Suggestion).delete() 

323 deleted += count 

324 

325 db.commit() 

326 

327 return { 

328 "success": True, 

329 "deleted": deleted, 

330 "message": f"Successfully cleared {deleted} records from {table}", 

331 } 

332 except Exception as e: 

333 db.rollback() 

334 raise HTTPException( 

335 status_code=500, 

336 detail=f"Clear operation failed: {str(e)}", 

337 )