"""ScanService — orchestrates parsers, file status detection, and entity collection.""" from __future__ import annotations from datetime import datetime, timezone from pathlib import Path from typing import Any from app.modules.design.domain.services import DesignValidationService from app.modules.design.domain.value_objects import FileStatus from app.modules.project.domain.entities import Project from app.modules.scanner.domain.entities import ( FileStatusEntry, ScanResult, ScanSummary, ) from app.modules.scanner.infrastructure.parsers.csv_parser import CsvParser from app.modules.scanner.infrastructure.parsers.md_parser import MdParser from app.modules.scanner.infrastructure.parsers.openapi_parser import OpenapiParser class ScanService: """Scan a project's design directory and produce a ScanResult.""" def __init__(self) -> None: self._csv_parser = CsvParser() self._md_parser = MdParser() self._openapi_parser = OpenapiParser() self._cache: dict[str, ScanResult] = {} def scan(self, project: Project) -> ScanResult: design_dir = Path(project.design_dir) file_statuses: list[FileStatusEntry] = [] all_entities: dict[str, list[Any]] = {} # Walk design directory recursively for file_path in sorted(design_dir.rglob("*")): if not file_path.is_file(): continue # Determine file status try: content = file_path.read_text(encoding="utf-8") except Exception: content = "" status = DesignValidationService.determine_file_status( content, str(file_path) ) lines = len(content.splitlines()) if content else 0 rel_path = str(file_path.relative_to(design_dir)) file_statuses.append(FileStatusEntry( path=rel_path, status=status, content_lines=lines, )) # Dispatch to appropriate parser parsed: dict[str, list[Any]] = {} suffix = file_path.suffix.lower() fname = file_path.name.lower() if suffix == ".csv": parsed = self._csv_parser.parse(file_path) elif suffix == ".md": parsed = self._md_parser.parse(file_path) elif suffix == ".yaml" or suffix == ".yml": if "openapi" in fname or "api-contracts" in fname: parsed = self._openapi_parser.parse(file_path) # Merge parsed entities for key, entities in parsed.items(): if key not in all_entities: all_entities[key] = [] all_entities[key].extend(entities) # Build summary summary = self._build_summary(file_statuses) # Assemble ScanResult # Singleton fields (take first item from list or None) singleton_keys = { "scope_and_goals", "system_context", "solution_layer", "module_boundary_rule", "runtime_topology", "operational_baseline", "release_plan", } kwargs: dict[str, Any] = { "project_id": project.id, "scanned_at": datetime.now(timezone.utc), "file_statuses": file_statuses, "summary": summary, } for key, entities in all_entities.items(): if key in singleton_keys: kwargs[key] = entities[0] if entities else None else: kwargs[key] = entities result = ScanResult(**kwargs) self._cache[project.id] = result return result def get_latest_scan(self, project_id: str) -> ScanResult | None: return self._cache.get(project_id) @staticmethod def _build_summary(file_statuses: list[FileStatusEntry]) -> ScanSummary: ok = sum(1 for fs in file_statuses if fs.status == FileStatus.OK) sparse = sum(1 for fs in file_statuses if fs.status == FileStatus.SPARSE) missing = sum(1 for fs in file_statuses if fs.status == FileStatus.MISSING) placeholder_heavy = sum( 1 for fs in file_statuses if fs.status == FileStatus.PLACEHOLDER_HEAVY ) template_residue = sum( 1 for fs in file_statuses if fs.status == FileStatus.TEMPLATE_RESIDUE ) return ScanSummary( total_files=len(file_statuses), ok=ok, sparse=sparse, missing=missing, placeholder_heavy=placeholder_heavy, template_residue=template_residue, )