128 lines
4.5 KiB
Python
128 lines
4.5 KiB
Python
"""ScanService — orchestrates parsers, file status detection, and entity collection."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from app.modules.design.domain.services import DesignValidationService
|
|
from app.modules.design.domain.value_objects import FileStatus
|
|
from app.modules.project.domain.entities import Project
|
|
from app.modules.scanner.domain.entities import (
|
|
FileStatusEntry,
|
|
ScanResult,
|
|
ScanSummary,
|
|
)
|
|
from app.modules.scanner.infrastructure.parsers.csv_parser import CsvParser
|
|
from app.modules.scanner.infrastructure.parsers.md_parser import MdParser
|
|
from app.modules.scanner.infrastructure.parsers.openapi_parser import OpenapiParser
|
|
|
|
|
|
class ScanService:
|
|
"""Scan a project's design directory and produce a ScanResult."""
|
|
|
|
def __init__(self) -> None:
|
|
self._csv_parser = CsvParser()
|
|
self._md_parser = MdParser()
|
|
self._openapi_parser = OpenapiParser()
|
|
self._cache: dict[str, ScanResult] = {}
|
|
|
|
def scan(self, project: Project) -> ScanResult:
|
|
design_dir = Path(project.design_dir)
|
|
file_statuses: list[FileStatusEntry] = []
|
|
all_entities: dict[str, list[Any]] = {}
|
|
|
|
# Walk design directory recursively
|
|
for file_path in sorted(design_dir.rglob("*")):
|
|
if not file_path.is_file():
|
|
continue
|
|
|
|
# Determine file status
|
|
try:
|
|
content = file_path.read_text(encoding="utf-8")
|
|
except Exception:
|
|
content = ""
|
|
|
|
status = DesignValidationService.determine_file_status(
|
|
content, str(file_path)
|
|
)
|
|
lines = len(content.splitlines()) if content else 0
|
|
rel_path = str(file_path.relative_to(design_dir))
|
|
|
|
file_statuses.append(FileStatusEntry(
|
|
path=rel_path,
|
|
status=status,
|
|
content_lines=lines,
|
|
))
|
|
|
|
# Dispatch to appropriate parser
|
|
parsed: dict[str, list[Any]] = {}
|
|
suffix = file_path.suffix.lower()
|
|
fname = file_path.name.lower()
|
|
|
|
if suffix == ".csv":
|
|
parsed = self._csv_parser.parse(file_path)
|
|
elif suffix == ".md":
|
|
parsed = self._md_parser.parse(file_path)
|
|
elif suffix == ".yaml" or suffix == ".yml":
|
|
if "openapi" in fname or "api-contracts" in fname:
|
|
parsed = self._openapi_parser.parse(file_path)
|
|
|
|
# Merge parsed entities
|
|
for key, entities in parsed.items():
|
|
if key not in all_entities:
|
|
all_entities[key] = []
|
|
all_entities[key].extend(entities)
|
|
|
|
# Build summary
|
|
summary = self._build_summary(file_statuses)
|
|
|
|
# Assemble ScanResult
|
|
# Singleton fields (take first item from list or None)
|
|
singleton_keys = {
|
|
"scope_and_goals", "system_context", "solution_layer",
|
|
"module_boundary_rule", "runtime_topology",
|
|
"operational_baseline", "release_plan",
|
|
}
|
|
|
|
kwargs: dict[str, Any] = {
|
|
"project_id": project.id,
|
|
"scanned_at": datetime.now(timezone.utc),
|
|
"file_statuses": file_statuses,
|
|
"summary": summary,
|
|
}
|
|
|
|
for key, entities in all_entities.items():
|
|
if key in singleton_keys:
|
|
kwargs[key] = entities[0] if entities else None
|
|
else:
|
|
kwargs[key] = entities
|
|
|
|
result = ScanResult(**kwargs)
|
|
self._cache[project.id] = result
|
|
return result
|
|
|
|
def get_latest_scan(self, project_id: str) -> ScanResult | None:
|
|
return self._cache.get(project_id)
|
|
|
|
@staticmethod
|
|
def _build_summary(file_statuses: list[FileStatusEntry]) -> ScanSummary:
|
|
ok = sum(1 for fs in file_statuses if fs.status == FileStatus.OK)
|
|
sparse = sum(1 for fs in file_statuses if fs.status == FileStatus.SPARSE)
|
|
missing = sum(1 for fs in file_statuses if fs.status == FileStatus.MISSING)
|
|
placeholder_heavy = sum(
|
|
1 for fs in file_statuses if fs.status == FileStatus.PLACEHOLDER_HEAVY
|
|
)
|
|
template_residue = sum(
|
|
1 for fs in file_statuses if fs.status == FileStatus.TEMPLATE_RESIDUE
|
|
)
|
|
return ScanSummary(
|
|
total_files=len(file_statuses),
|
|
ok=ok,
|
|
sparse=sparse,
|
|
missing=missing,
|
|
placeholder_heavy=placeholder_heavy,
|
|
template_residue=template_residue,
|
|
)
|