"""Scanner HTTP router — scan trigger, entity query endpoints.""" from __future__ import annotations from dataclasses import asdict from fastapi import APIRouter from fastapi.responses import JSONResponse from app.modules.project.application.services import ProjectService from app.modules.scanner.application.services import ScanService from app.modules.scanner.domain.entities import ScanResult router = APIRouter(prefix="/projects/{project_id}", tags=["scanner"]) _project_service: ProjectService | None = None _scan_service: ScanService | None = None def init_router(project_service: ProjectService, scan_service: ScanService) -> None: global _project_service, _scan_service _project_service = project_service _scan_service = scan_service def _scan_result_response(result: ScanResult) -> dict: """Build API response for ScanResult (no entity lists).""" return { "project_id": result.project_id, "scanned_at": result.scanned_at.isoformat(), "file_statuses": [ { "path": fs.path, "status": fs.status.value, "content_lines": fs.content_lines, } for fs in result.file_statuses ], "summary": { "total_files": result.summary.total_files, "ok": result.summary.ok, "sparse": result.summary.sparse, "missing": result.summary.missing, "placeholder_heavy": result.summary.placeholder_heavy, "template_residue": result.summary.template_residue, }, } def _entity_to_dict(entity) -> dict: """Convert a dataclass entity to a dict using asdict.""" return asdict(entity) def _integration_to_dict(integration) -> dict: """Convert Integration to dict with source_id/target_id mapped to source/target per spec.""" d = asdict(integration) d["source"] = d.pop("source_id") d["target"] = d.pop("target_id") return d def _get_scan_or_404(project_id: str) -> ScanResult | None: """Get cached scan result or return None.""" return _scan_service.get_latest_scan(project_id) # ── Scan endpoints ── @router.post("/scan") def trigger_scan(project_id: str): project = _project_service.get_project(project_id) result = _scan_service.scan(project) return _scan_result_response(result) @router.get("/scan") def get_scan(project_id: str): _project_service.get_project(project_id) # Ensure project exists (raises 404) result = _get_scan_or_404(project_id) if result is None: return JSONResponse(status_code=404, content={"detail": "No scan available"}) return _scan_result_response(result) # ── Entity list endpoints ── @router.get("/entities/capabilities") def list_capabilities(project_id: str): result = _get_scan_or_404(project_id) if result is None: return JSONResponse(status_code=404, content={"detail": "No scan available"}) return [_entity_to_dict(c) for c in result.capabilities] @router.get("/entities/modules") def list_modules(project_id: str): result = _get_scan_or_404(project_id) if result is None: return JSONResponse(status_code=404, content={"detail": "No scan available"}) return [_entity_to_dict(m) for m in result.modules] @router.get("/entities/entities") def list_entities(project_id: str): result = _get_scan_or_404(project_id) if result is None: return JSONResponse(status_code=404, content={"detail": "No scan available"}) return [_entity_to_dict(e) for e in result.entities] @router.get("/entities/integrations") def list_integrations(project_id: str): result = _get_scan_or_404(project_id) if result is None: return JSONResponse(status_code=404, content={"detail": "No scan available"}) return [_integration_to_dict(i) for i in result.integrations] @router.get("/entities/value-flows") def list_value_flows(project_id: str): result = _get_scan_or_404(project_id) if result is None: return JSONResponse(status_code=404, content={"detail": "No scan available"}) return [_entity_to_dict(v) for v in result.value_flows] @router.get("/entities/user-journeys") def list_user_journeys(project_id: str): result = _get_scan_or_404(project_id) if result is None: return JSONResponse(status_code=404, content={"detail": "No scan available"}) return [_entity_to_dict(j) for j in result.user_journeys] @router.get("/entities/data-flows") def list_data_flows(project_id: str): result = _get_scan_or_404(project_id) if result is None: return JSONResponse(status_code=404, content={"detail": "No scan available"}) return [_entity_to_dict(d) for d in result.data_flows] @router.get("/entities/external-systems") def list_external_systems(project_id: str): result = _get_scan_or_404(project_id) if result is None: return JSONResponse(status_code=404, content={"detail": "No scan available"}) return [_entity_to_dict(s) for s in result.external_systems] @router.get("/entities/traceability-links") def list_traceability_links(project_id: str): result = _get_scan_or_404(project_id) if result is None: return JSONResponse(status_code=404, content={"detail": "No scan available"}) return [_entity_to_dict(t) for t in result.traceability_links] @router.get("/entities/runtime-components") def list_runtime_components(project_id: str): result = _get_scan_or_404(project_id) if result is None: return JSONResponse(status_code=404, content={"detail": "No scan available"}) return [_entity_to_dict(c) for c in result.runtime_components] # ── Detail endpoints ── @router.get("/entities/capabilities/{capability_id}") def get_capability_detail(project_id: str, capability_id: str): result = _get_scan_or_404(project_id) if result is None: return JSONResponse(status_code=404, content={"detail": "No scan available"}) cap = next((c for c in result.capabilities if c.capability_id == capability_id), None) if cap is None: return JSONResponse(status_code=404, content={"detail": f"Capability not found: {capability_id}"}) # Find related modules via traceability links related_module_ids = { link.module_id for link in result.traceability_links if link.capability_id == capability_id } related_modules = [m for m in result.modules if m.module_id in related_module_ids] # Find related value flows related_vf_ids = set(cap.related_value_flows) related_value_flows = [v for v in result.value_flows if v.value_flow_id in related_vf_ids] return { "capability": _entity_to_dict(cap), "modules": [_entity_to_dict(m) for m in related_modules], "value_flows": [_entity_to_dict(v) for v in related_value_flows], } @router.get("/entities/modules/{module_id}") def get_module_detail(project_id: str, module_id: str): result = _get_scan_or_404(project_id) if result is None: return JSONResponse(status_code=404, content={"detail": "No scan available"}) mod = next((m for m in result.modules if m.module_id == module_id), None) if mod is None: return JSONResponse(status_code=404, content={"detail": f"Module not found: {module_id}"}) # Find owned entities (entity.owner_module matches) owned_entities = [e for e in result.entities if e.owner_module == module_id] # Find integrations where source or target matches related_integrations = [ i for i in result.integrations if i.source_id == module_id or i.target_id == module_id ] # Find codebase alignment alignment = next( (a for a in result.codebase_alignments if a.module_id == module_id), None ) return { "module": _entity_to_dict(mod), "entities": [_entity_to_dict(e) for e in owned_entities], "integrations": [_integration_to_dict(i) for i in related_integrations], "codebase_alignment": _entity_to_dict(alignment) if alignment else None, } @router.get("/entities/entities/{entity_id}") def get_entity_detail(project_id: str, entity_id: str): result = _get_scan_or_404(project_id) if result is None: return JSONResponse(status_code=404, content={"detail": "No scan available"}) entity = next((e for e in result.entities if e.entity_id == entity_id), None) if entity is None: return JSONResponse(status_code=404, content={"detail": f"Entity not found: {entity_id}"}) # Find data flows where source or target matches entity name or entity_id related_data_flows = [ d for d in result.data_flows if entity_id in d.source or entity_id in d.target ] return { "entity": _entity_to_dict(entity), "data_flows": [_entity_to_dict(d) for d in related_data_flows], }