254 lines
8.7 KiB
Python
254 lines
8.7 KiB
Python
"""Scanner HTTP router — scan trigger, entity query endpoints."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import asdict
|
|
|
|
from fastapi import APIRouter
|
|
from fastapi.responses import JSONResponse
|
|
|
|
from app.modules.project.application.services import ProjectService
|
|
from app.modules.scanner.application.services import ScanService
|
|
from app.modules.scanner.domain.entities import ScanResult
|
|
|
|
router = APIRouter(prefix="/projects/{project_id}", tags=["scanner"])
|
|
|
|
_project_service: ProjectService | None = None
|
|
_scan_service: ScanService | None = None
|
|
|
|
|
|
def init_router(project_service: ProjectService, scan_service: ScanService) -> None:
|
|
global _project_service, _scan_service
|
|
_project_service = project_service
|
|
_scan_service = scan_service
|
|
|
|
|
|
def _scan_result_response(result: ScanResult) -> dict:
|
|
"""Build API response for ScanResult (no entity lists)."""
|
|
return {
|
|
"project_id": result.project_id,
|
|
"scanned_at": result.scanned_at.isoformat(),
|
|
"file_statuses": [
|
|
{
|
|
"path": fs.path,
|
|
"status": fs.status.value,
|
|
"content_lines": fs.content_lines,
|
|
}
|
|
for fs in result.file_statuses
|
|
],
|
|
"summary": {
|
|
"total_files": result.summary.total_files,
|
|
"ok": result.summary.ok,
|
|
"sparse": result.summary.sparse,
|
|
"missing": result.summary.missing,
|
|
"placeholder_heavy": result.summary.placeholder_heavy,
|
|
"template_residue": result.summary.template_residue,
|
|
},
|
|
}
|
|
|
|
|
|
def _entity_to_dict(entity) -> dict:
|
|
"""Convert a dataclass entity to a dict using asdict."""
|
|
return asdict(entity)
|
|
|
|
|
|
def _integration_to_dict(integration) -> dict:
|
|
"""Convert Integration to dict with source_id/target_id mapped to source/target per spec."""
|
|
d = asdict(integration)
|
|
d["source"] = d.pop("source_id")
|
|
d["target"] = d.pop("target_id")
|
|
return d
|
|
|
|
|
|
def _get_scan_or_404(project_id: str) -> ScanResult | None:
|
|
"""Get cached scan result or return None."""
|
|
return _scan_service.get_latest_scan(project_id)
|
|
|
|
|
|
# ── Scan endpoints ──
|
|
|
|
|
|
@router.post("/scan")
|
|
def trigger_scan(project_id: str):
|
|
project = _project_service.get_project(project_id)
|
|
result = _scan_service.scan(project)
|
|
return _scan_result_response(result)
|
|
|
|
|
|
@router.get("/scan")
|
|
def get_scan(project_id: str):
|
|
_project_service.get_project(project_id) # Ensure project exists (raises 404)
|
|
result = _get_scan_or_404(project_id)
|
|
if result is None:
|
|
return JSONResponse(status_code=404, content={"detail": "No scan available"})
|
|
return _scan_result_response(result)
|
|
|
|
|
|
# ── Entity list endpoints ──
|
|
|
|
|
|
@router.get("/entities/capabilities")
|
|
def list_capabilities(project_id: str):
|
|
result = _get_scan_or_404(project_id)
|
|
if result is None:
|
|
return JSONResponse(status_code=404, content={"detail": "No scan available"})
|
|
return [_entity_to_dict(c) for c in result.capabilities]
|
|
|
|
|
|
@router.get("/entities/modules")
|
|
def list_modules(project_id: str):
|
|
result = _get_scan_or_404(project_id)
|
|
if result is None:
|
|
return JSONResponse(status_code=404, content={"detail": "No scan available"})
|
|
return [_entity_to_dict(m) for m in result.modules]
|
|
|
|
|
|
@router.get("/entities/entities")
|
|
def list_entities(project_id: str):
|
|
result = _get_scan_or_404(project_id)
|
|
if result is None:
|
|
return JSONResponse(status_code=404, content={"detail": "No scan available"})
|
|
return [_entity_to_dict(e) for e in result.entities]
|
|
|
|
|
|
@router.get("/entities/integrations")
|
|
def list_integrations(project_id: str):
|
|
result = _get_scan_or_404(project_id)
|
|
if result is None:
|
|
return JSONResponse(status_code=404, content={"detail": "No scan available"})
|
|
return [_integration_to_dict(i) for i in result.integrations]
|
|
|
|
|
|
@router.get("/entities/value-flows")
|
|
def list_value_flows(project_id: str):
|
|
result = _get_scan_or_404(project_id)
|
|
if result is None:
|
|
return JSONResponse(status_code=404, content={"detail": "No scan available"})
|
|
return [_entity_to_dict(v) for v in result.value_flows]
|
|
|
|
|
|
@router.get("/entities/user-journeys")
|
|
def list_user_journeys(project_id: str):
|
|
result = _get_scan_or_404(project_id)
|
|
if result is None:
|
|
return JSONResponse(status_code=404, content={"detail": "No scan available"})
|
|
return [_entity_to_dict(j) for j in result.user_journeys]
|
|
|
|
|
|
@router.get("/entities/data-flows")
|
|
def list_data_flows(project_id: str):
|
|
result = _get_scan_or_404(project_id)
|
|
if result is None:
|
|
return JSONResponse(status_code=404, content={"detail": "No scan available"})
|
|
return [_entity_to_dict(d) for d in result.data_flows]
|
|
|
|
|
|
@router.get("/entities/external-systems")
|
|
def list_external_systems(project_id: str):
|
|
result = _get_scan_or_404(project_id)
|
|
if result is None:
|
|
return JSONResponse(status_code=404, content={"detail": "No scan available"})
|
|
return [_entity_to_dict(s) for s in result.external_systems]
|
|
|
|
|
|
@router.get("/entities/traceability-links")
|
|
def list_traceability_links(project_id: str):
|
|
result = _get_scan_or_404(project_id)
|
|
if result is None:
|
|
return JSONResponse(status_code=404, content={"detail": "No scan available"})
|
|
return [_entity_to_dict(t) for t in result.traceability_links]
|
|
|
|
|
|
@router.get("/entities/runtime-components")
|
|
def list_runtime_components(project_id: str):
|
|
result = _get_scan_or_404(project_id)
|
|
if result is None:
|
|
return JSONResponse(status_code=404, content={"detail": "No scan available"})
|
|
return [_entity_to_dict(c) for c in result.runtime_components]
|
|
|
|
|
|
# ── Detail endpoints ──
|
|
|
|
|
|
@router.get("/entities/capabilities/{capability_id}")
|
|
def get_capability_detail(project_id: str, capability_id: str):
|
|
result = _get_scan_or_404(project_id)
|
|
if result is None:
|
|
return JSONResponse(status_code=404, content={"detail": "No scan available"})
|
|
|
|
cap = next((c for c in result.capabilities if c.capability_id == capability_id), None)
|
|
if cap is None:
|
|
return JSONResponse(status_code=404, content={"detail": f"Capability not found: {capability_id}"})
|
|
|
|
# Find related modules via traceability links
|
|
related_module_ids = {
|
|
link.module_id
|
|
for link in result.traceability_links
|
|
if link.capability_id == capability_id
|
|
}
|
|
related_modules = [m for m in result.modules if m.module_id in related_module_ids]
|
|
|
|
# Find related value flows
|
|
related_vf_ids = set(cap.related_value_flows)
|
|
related_value_flows = [v for v in result.value_flows if v.value_flow_id in related_vf_ids]
|
|
|
|
return {
|
|
"capability": _entity_to_dict(cap),
|
|
"modules": [_entity_to_dict(m) for m in related_modules],
|
|
"value_flows": [_entity_to_dict(v) for v in related_value_flows],
|
|
}
|
|
|
|
|
|
@router.get("/entities/modules/{module_id}")
|
|
def get_module_detail(project_id: str, module_id: str):
|
|
result = _get_scan_or_404(project_id)
|
|
if result is None:
|
|
return JSONResponse(status_code=404, content={"detail": "No scan available"})
|
|
|
|
mod = next((m for m in result.modules if m.module_id == module_id), None)
|
|
if mod is None:
|
|
return JSONResponse(status_code=404, content={"detail": f"Module not found: {module_id}"})
|
|
|
|
# Find owned entities (entity.owner_module matches)
|
|
owned_entities = [e for e in result.entities if e.owner_module == module_id]
|
|
|
|
# Find integrations where source or target matches
|
|
related_integrations = [
|
|
i for i in result.integrations
|
|
if i.source_id == module_id or i.target_id == module_id
|
|
]
|
|
|
|
# Find codebase alignment
|
|
alignment = next(
|
|
(a for a in result.codebase_alignments if a.module_id == module_id), None
|
|
)
|
|
|
|
return {
|
|
"module": _entity_to_dict(mod),
|
|
"entities": [_entity_to_dict(e) for e in owned_entities],
|
|
"integrations": [_integration_to_dict(i) for i in related_integrations],
|
|
"codebase_alignment": _entity_to_dict(alignment) if alignment else None,
|
|
}
|
|
|
|
|
|
@router.get("/entities/entities/{entity_id}")
|
|
def get_entity_detail(project_id: str, entity_id: str):
|
|
result = _get_scan_or_404(project_id)
|
|
if result is None:
|
|
return JSONResponse(status_code=404, content={"detail": "No scan available"})
|
|
|
|
entity = next((e for e in result.entities if e.entity_id == entity_id), None)
|
|
if entity is None:
|
|
return JSONResponse(status_code=404, content={"detail": f"Entity not found: {entity_id}"})
|
|
|
|
# Find data flows where source or target matches entity name or entity_id
|
|
related_data_flows = [
|
|
d for d in result.data_flows
|
|
if entity_id in d.source or entity_id in d.target
|
|
]
|
|
|
|
return {
|
|
"entity": _entity_to_dict(entity),
|
|
"data_flows": [_entity_to_dict(d) for d in related_data_flows],
|
|
}
|