arch-design-agent-skill-das.../backend/app/modules/scanner/interfaces/http/router.py
openclaw 602e69b56e feat(scanner): add REST API — scan trigger, entity query endpoints
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-23 16:37:10 +00:00

254 lines
8.7 KiB
Python

"""Scanner HTTP router — scan trigger, entity query endpoints."""
from __future__ import annotations
from dataclasses import asdict
from fastapi import APIRouter
from fastapi.responses import JSONResponse
from app.modules.project.application.services import ProjectService
from app.modules.scanner.application.services import ScanService
from app.modules.scanner.domain.entities import ScanResult
router = APIRouter(prefix="/projects/{project_id}", tags=["scanner"])
_project_service: ProjectService | None = None
_scan_service: ScanService | None = None
def init_router(project_service: ProjectService, scan_service: ScanService) -> None:
global _project_service, _scan_service
_project_service = project_service
_scan_service = scan_service
def _scan_result_response(result: ScanResult) -> dict:
"""Build API response for ScanResult (no entity lists)."""
return {
"project_id": result.project_id,
"scanned_at": result.scanned_at.isoformat(),
"file_statuses": [
{
"path": fs.path,
"status": fs.status.value,
"content_lines": fs.content_lines,
}
for fs in result.file_statuses
],
"summary": {
"total_files": result.summary.total_files,
"ok": result.summary.ok,
"sparse": result.summary.sparse,
"missing": result.summary.missing,
"placeholder_heavy": result.summary.placeholder_heavy,
"template_residue": result.summary.template_residue,
},
}
def _entity_to_dict(entity) -> dict:
"""Convert a dataclass entity to a dict using asdict."""
return asdict(entity)
def _integration_to_dict(integration) -> dict:
"""Convert Integration to dict with source_id/target_id mapped to source/target per spec."""
d = asdict(integration)
d["source"] = d.pop("source_id")
d["target"] = d.pop("target_id")
return d
def _get_scan_or_404(project_id: str) -> ScanResult | None:
"""Get cached scan result or return None."""
return _scan_service.get_latest_scan(project_id)
# ── Scan endpoints ──
@router.post("/scan")
def trigger_scan(project_id: str):
project = _project_service.get_project(project_id)
result = _scan_service.scan(project)
return _scan_result_response(result)
@router.get("/scan")
def get_scan(project_id: str):
_project_service.get_project(project_id) # Ensure project exists (raises 404)
result = _get_scan_or_404(project_id)
if result is None:
return JSONResponse(status_code=404, content={"detail": "No scan available"})
return _scan_result_response(result)
# ── Entity list endpoints ──
@router.get("/entities/capabilities")
def list_capabilities(project_id: str):
result = _get_scan_or_404(project_id)
if result is None:
return JSONResponse(status_code=404, content={"detail": "No scan available"})
return [_entity_to_dict(c) for c in result.capabilities]
@router.get("/entities/modules")
def list_modules(project_id: str):
result = _get_scan_or_404(project_id)
if result is None:
return JSONResponse(status_code=404, content={"detail": "No scan available"})
return [_entity_to_dict(m) for m in result.modules]
@router.get("/entities/entities")
def list_entities(project_id: str):
result = _get_scan_or_404(project_id)
if result is None:
return JSONResponse(status_code=404, content={"detail": "No scan available"})
return [_entity_to_dict(e) for e in result.entities]
@router.get("/entities/integrations")
def list_integrations(project_id: str):
result = _get_scan_or_404(project_id)
if result is None:
return JSONResponse(status_code=404, content={"detail": "No scan available"})
return [_integration_to_dict(i) for i in result.integrations]
@router.get("/entities/value-flows")
def list_value_flows(project_id: str):
result = _get_scan_or_404(project_id)
if result is None:
return JSONResponse(status_code=404, content={"detail": "No scan available"})
return [_entity_to_dict(v) for v in result.value_flows]
@router.get("/entities/user-journeys")
def list_user_journeys(project_id: str):
result = _get_scan_or_404(project_id)
if result is None:
return JSONResponse(status_code=404, content={"detail": "No scan available"})
return [_entity_to_dict(j) for j in result.user_journeys]
@router.get("/entities/data-flows")
def list_data_flows(project_id: str):
result = _get_scan_or_404(project_id)
if result is None:
return JSONResponse(status_code=404, content={"detail": "No scan available"})
return [_entity_to_dict(d) for d in result.data_flows]
@router.get("/entities/external-systems")
def list_external_systems(project_id: str):
result = _get_scan_or_404(project_id)
if result is None:
return JSONResponse(status_code=404, content={"detail": "No scan available"})
return [_entity_to_dict(s) for s in result.external_systems]
@router.get("/entities/traceability-links")
def list_traceability_links(project_id: str):
result = _get_scan_or_404(project_id)
if result is None:
return JSONResponse(status_code=404, content={"detail": "No scan available"})
return [_entity_to_dict(t) for t in result.traceability_links]
@router.get("/entities/runtime-components")
def list_runtime_components(project_id: str):
result = _get_scan_or_404(project_id)
if result is None:
return JSONResponse(status_code=404, content={"detail": "No scan available"})
return [_entity_to_dict(c) for c in result.runtime_components]
# ── Detail endpoints ──
@router.get("/entities/capabilities/{capability_id}")
def get_capability_detail(project_id: str, capability_id: str):
result = _get_scan_or_404(project_id)
if result is None:
return JSONResponse(status_code=404, content={"detail": "No scan available"})
cap = next((c for c in result.capabilities if c.capability_id == capability_id), None)
if cap is None:
return JSONResponse(status_code=404, content={"detail": f"Capability not found: {capability_id}"})
# Find related modules via traceability links
related_module_ids = {
link.module_id
for link in result.traceability_links
if link.capability_id == capability_id
}
related_modules = [m for m in result.modules if m.module_id in related_module_ids]
# Find related value flows
related_vf_ids = set(cap.related_value_flows)
related_value_flows = [v for v in result.value_flows if v.value_flow_id in related_vf_ids]
return {
"capability": _entity_to_dict(cap),
"modules": [_entity_to_dict(m) for m in related_modules],
"value_flows": [_entity_to_dict(v) for v in related_value_flows],
}
@router.get("/entities/modules/{module_id}")
def get_module_detail(project_id: str, module_id: str):
result = _get_scan_or_404(project_id)
if result is None:
return JSONResponse(status_code=404, content={"detail": "No scan available"})
mod = next((m for m in result.modules if m.module_id == module_id), None)
if mod is None:
return JSONResponse(status_code=404, content={"detail": f"Module not found: {module_id}"})
# Find owned entities (entity.owner_module matches)
owned_entities = [e for e in result.entities if e.owner_module == module_id]
# Find integrations where source or target matches
related_integrations = [
i for i in result.integrations
if i.source_id == module_id or i.target_id == module_id
]
# Find codebase alignment
alignment = next(
(a for a in result.codebase_alignments if a.module_id == module_id), None
)
return {
"module": _entity_to_dict(mod),
"entities": [_entity_to_dict(e) for e in owned_entities],
"integrations": [_integration_to_dict(i) for i in related_integrations],
"codebase_alignment": _entity_to_dict(alignment) if alignment else None,
}
@router.get("/entities/entities/{entity_id}")
def get_entity_detail(project_id: str, entity_id: str):
result = _get_scan_or_404(project_id)
if result is None:
return JSONResponse(status_code=404, content={"detail": "No scan available"})
entity = next((e for e in result.entities if e.entity_id == entity_id), None)
if entity is None:
return JSONResponse(status_code=404, content={"detail": f"Entity not found: {entity_id}"})
# Find data flows where source or target matches entity name or entity_id
related_data_flows = [
d for d in result.data_flows
if entity_id in d.source or entity_id in d.target
]
return {
"entity": _entity_to_dict(entity),
"data_flows": [_entity_to_dict(d) for d in related_data_flows],
}