arch-design-agent-skill-das.../backend/app/modules/scanner/application/services.py
2026-03-23 16:33:49 +00:00

128 lines
4.5 KiB
Python

"""ScanService — orchestrates parsers, file status detection, and entity collection."""
from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from app.modules.design.domain.services import DesignValidationService
from app.modules.design.domain.value_objects import FileStatus
from app.modules.project.domain.entities import Project
from app.modules.scanner.domain.entities import (
FileStatusEntry,
ScanResult,
ScanSummary,
)
from app.modules.scanner.infrastructure.parsers.csv_parser import CsvParser
from app.modules.scanner.infrastructure.parsers.md_parser import MdParser
from app.modules.scanner.infrastructure.parsers.openapi_parser import OpenapiParser
class ScanService:
"""Scan a project's design directory and produce a ScanResult."""
def __init__(self) -> None:
self._csv_parser = CsvParser()
self._md_parser = MdParser()
self._openapi_parser = OpenapiParser()
self._cache: dict[str, ScanResult] = {}
def scan(self, project: Project) -> ScanResult:
design_dir = Path(project.design_dir)
file_statuses: list[FileStatusEntry] = []
all_entities: dict[str, list[Any]] = {}
# Walk design directory recursively
for file_path in sorted(design_dir.rglob("*")):
if not file_path.is_file():
continue
# Determine file status
try:
content = file_path.read_text(encoding="utf-8")
except Exception:
content = ""
status = DesignValidationService.determine_file_status(
content, str(file_path)
)
lines = len(content.splitlines()) if content else 0
rel_path = str(file_path.relative_to(design_dir))
file_statuses.append(FileStatusEntry(
path=rel_path,
status=status,
content_lines=lines,
))
# Dispatch to appropriate parser
parsed: dict[str, list[Any]] = {}
suffix = file_path.suffix.lower()
fname = file_path.name.lower()
if suffix == ".csv":
parsed = self._csv_parser.parse(file_path)
elif suffix == ".md":
parsed = self._md_parser.parse(file_path)
elif suffix == ".yaml" or suffix == ".yml":
if "openapi" in fname or "api-contracts" in fname:
parsed = self._openapi_parser.parse(file_path)
# Merge parsed entities
for key, entities in parsed.items():
if key not in all_entities:
all_entities[key] = []
all_entities[key].extend(entities)
# Build summary
summary = self._build_summary(file_statuses)
# Assemble ScanResult
# Singleton fields (take first item from list or None)
singleton_keys = {
"scope_and_goals", "system_context", "solution_layer",
"module_boundary_rule", "runtime_topology",
"operational_baseline", "release_plan",
}
kwargs: dict[str, Any] = {
"project_id": project.id,
"scanned_at": datetime.now(timezone.utc),
"file_statuses": file_statuses,
"summary": summary,
}
for key, entities in all_entities.items():
if key in singleton_keys:
kwargs[key] = entities[0] if entities else None
else:
kwargs[key] = entities
result = ScanResult(**kwargs)
self._cache[project.id] = result
return result
def get_latest_scan(self, project_id: str) -> ScanResult | None:
return self._cache.get(project_id)
@staticmethod
def _build_summary(file_statuses: list[FileStatusEntry]) -> ScanSummary:
ok = sum(1 for fs in file_statuses if fs.status == FileStatus.OK)
sparse = sum(1 for fs in file_statuses if fs.status == FileStatus.SPARSE)
missing = sum(1 for fs in file_statuses if fs.status == FileStatus.MISSING)
placeholder_heavy = sum(
1 for fs in file_statuses if fs.status == FileStatus.PLACEHOLDER_HEAVY
)
template_residue = sum(
1 for fs in file_statuses if fs.status == FileStatus.TEMPLATE_RESIDUE
)
return ScanSummary(
total_files=len(file_statuses),
ok=ok,
sparse=sparse,
missing=missing,
placeholder_heavy=placeholder_heavy,
template_residue=template_residue,
)