"""CSV parser — maps design CSV files to Design entity instances.""" from __future__ import annotations import csv from pathlib import Path from typing import Any from app.modules.design.domain.entities import ( Capability, ChangeLogEntry, CodebaseAlignment, DataFlow, DataSecurity, DomainEntity, DomainModule, Entity, Environment, ExternalSystem, Integration, Module, RuntimeComponent, Scenario, SharedTerm, TechSelection, TraceabilityLink, UbiquitousTerm, UserJourney, ValueFlow, ) def _split_space(value: str) -> list[str]: """Split a space-delimited string into a list, filtering empty strings.""" if not value or not value.strip(): return [] return value.strip().split() class CsvParser: """Parse CSV file and return dict mapping entity type name to list of instances. Keys match ScanResult field names (e.g., 'capabilities', 'modules', etc.) """ def parse(self, file_path: Path) -> dict[str, list[Any]]: fname = file_path.name.lower() stem = file_path.stem.lower() # Skip api-contracts CSV (handled by OpenAPI parser) if "api-contracts" in fname or "api_contracts" in fname: return {} # Skip module-boundary (this is an MD file concept) if "module-boundary" in fname or "module_boundary" in fname: return {} try: with open(file_path, newline="", encoding="utf-8") as f: reader = csv.DictReader(f) rows = list(reader) except Exception: return {} if not rows: return {} return self._dispatch(fname, stem, rows) def _dispatch(self, fname: str, stem: str, rows: list[dict[str, str]]) -> dict[str, list[Any]]: if "capability-map" in fname or "capability_map" in fname: return {"capabilities": [self._parse_capability(r) for r in rows]} if "value-flows" in fname or "value_flows" in fname: return {"value_flows": [self._parse_value_flow(r) for r in rows]} if "user-journeys" in fname or "user_journeys" in fname: return {"user_journeys": [self._parse_user_journey(r) for r in rows]} if "integrations" in fname: return {"integrations": [self._parse_integration(r) for r in rows]} if "external-systems" in fname or "external_systems" in fname: return {"external_systems": [self._parse_external_system(r) for r in rows]} if "codebase-alignment" in fname or "codebase_alignment" in fname: return {"codebase_alignments": [self._parse_codebase_alignment(r) for r in rows]} if "codebase-mapping" in fname or "codebase_mapping" in fname: return {"codebase_alignments": [self._parse_codebase_mapping(r) for r in rows]} # entities.csv in data-architecture (not domain-entities) if stem == "01-entities" or (fname.endswith("entities.csv") and "domain" not in fname): return {"entities": [self._parse_entity(r) for r in rows]} if "data-flows" in fname or "data_flows" in fname: return {"data_flows": [self._parse_data_flow(r) for r in rows]} if "data-security" in fname or "data_security" in fname: return {"data_securities": [self._parse_data_security(r) for r in rows]} if "technology-selection" in fname or "technology_selection" in fname: return {"tech_selections": [self._parse_tech_selection(r) for r in rows]} if "runtime-components" in fname or "runtime_components" in fname: return {"runtime_components": [self._parse_runtime_component(r) for r in rows]} if "environments" in fname: return {"environments": [self._parse_environment(r) for r in rows]} if fname == "traceability.csv": return {"traceability_links": [self._parse_traceability_link(r) for r in rows]} if "change-log" in fname or "change_log" in fname: return {"change_log_entries": [self._parse_change_log_entry(r) for r in rows]} if "shared-terminology" in fname or "shared_terminology" in fname: return {"shared_terms": [self._parse_shared_term(r) for r in rows]} if "ubiquitous-language" in fname or "ubiquitous_language" in fname: return {"ubiquitous_terms": [self._parse_ubiquitous_term(r) for r in rows]} if "scenarios-and-flows" in fname or "scenarios_and_flows" in fname: return {"scenarios": [self._parse_scenario(r) for r in rows]} if "domain-modules" in fname or "domain_modules" in fname: return {"domain_modules": [self._parse_domain_module(r) for r in rows]} if "domain-entities" in fname or "domain_entities" in fname: return {"domain_entities": [self._parse_domain_entity(r) for r in rows]} # modules.csv in application-architecture if fname.endswith("modules.csv"): return {"modules": [self._parse_module(r) for r in rows]} return {} # ── Individual entity parsers ── @staticmethod def _g(row: dict[str, str], key: str) -> str: """Get a value from a row, defaulting to empty string.""" return (row.get(key) or "").strip() def _parse_capability(self, row: dict[str, str]) -> Capability: return Capability( capability_id=self._g(row, "capability_id"), name=self._g(row, "capability_name"), description=self._g(row, "description"), priority=self._g(row, "priority"), phase=self._g(row, "phase"), related_value_flows=_split_space(self._g(row, "related_value_flows")), ) def _parse_value_flow(self, row: dict[str, str]) -> ValueFlow: return ValueFlow( value_flow_id=self._g(row, "value_flow_id"), name=self._g(row, "value_flow_name"), trigger=self._g(row, "trigger"), actor=self._g(row, "actor"), steps=self._g(row, "steps"), outcome=self._g(row, "outcome"), phase=self._g(row, "phase"), related_capabilities=_split_space(self._g(row, "related_capabilities")), ) def _parse_user_journey(self, row: dict[str, str]) -> UserJourney: return UserJourney( journey_id=self._g(row, "journey_id"), name=self._g(row, "journey_name"), actor=self._g(row, "actor"), precondition=self._g(row, "precondition"), steps=self._g(row, "steps"), postcondition=self._g(row, "postcondition"), phase=self._g(row, "phase"), related_value_flows=_split_space(self._g(row, "related_value_flows")), ) def _parse_module(self, row: dict[str, str]) -> Module: return Module( module_id=self._g(row, "module_id"), name=self._g(row, "module_name"), layer=self._g(row, "layer"), description=self._g(row, "description"), phase=self._g(row, "phase"), depends_on=_split_space(self._g(row, "depends_on")), capabilities=_split_space(self._g(row, "capabilities")), ) def _parse_integration(self, row: dict[str, str]) -> Integration: return Integration( integration_id=self._g(row, "integration_id"), source_id=self._g(row, "source_id"), target_id=self._g(row, "target_id"), target_type=self._g(row, "target_type"), direction=self._g(row, "direction"), protocol=self._g(row, "protocol"), trigger=self._g(row, "trigger"), phase=self._g(row, "phase"), description=self._g(row, "description"), ) def _parse_external_system(self, row: dict[str, str]) -> ExternalSystem: return ExternalSystem( system_id=self._g(row, "system_id"), name=self._g(row, "system_name"), type=self._g(row, "type"), protocol=self._g(row, "protocol"), direction=self._g(row, "direction"), phase=self._g(row, "phase"), description=self._g(row, "description"), ) def _parse_codebase_alignment(self, row: dict[str, str]) -> CodebaseAlignment: return CodebaseAlignment( module_id=self._g(row, "module_id"), repo_root=self._g(row, "repo_root"), code_root=self._g(row, "code_root"), package_name=self._g(row, "package_name"), ) def _parse_codebase_mapping(self, row: dict[str, str]) -> CodebaseAlignment: return CodebaseAlignment( module_id=self._g(row, "module_id"), repo_root="", code_root=self._g(row, "code_path"), package_name=self._g(row, "package"), ) def _parse_entity(self, row: dict[str, str]) -> Entity: return Entity( entity_id=self._g(row, "entity_id"), name=self._g(row, "entity_name"), domain=self._g(row, "domain"), owner_module=self._g(row, "owner_module"), description=self._g(row, "description"), phase=self._g(row, "phase"), source_file=self._g(row, "source_file"), ) def _parse_data_flow(self, row: dict[str, str]) -> DataFlow: return DataFlow( data_flow_id=self._g(row, "data_flow_id"), source=self._g(row, "source"), target=self._g(row, "target"), data_content=self._g(row, "data_content"), trigger=self._g(row, "trigger"), protocol=self._g(row, "protocol"), phase=self._g(row, "phase"), description=self._g(row, "description"), ) def _parse_data_security(self, row: dict[str, str]) -> DataSecurity: return DataSecurity( security_id=self._g(row, "security_id"), sensitivity=self._g(row, "sensitivity"), entities=self._g(row, "entities"), protection=self._g(row, "protection_strategy"), ) def _parse_tech_selection(self, row: dict[str, str]) -> TechSelection: return TechSelection( category=self._g(row, "category"), technology=self._g(row, "technology"), version=self._g(row, "version"), purpose=self._g(row, "purpose"), rationale=self._g(row, "rationale"), alternatives_considered=self._g(row, "alternatives_considered"), phase=self._g(row, "phase"), ) def _parse_runtime_component(self, row: dict[str, str]) -> RuntimeComponent: return RuntimeComponent( component_id=self._g(row, "component_id"), name=self._g(row, "component_name"), type=self._g(row, "type"), technology=self._g(row, "technology"), port=self._g(row, "port"), ) def _parse_environment(self, row: dict[str, str]) -> Environment: return Environment( env_id=self._g(row, "env_id"), name=self._g(row, "env_name"), purpose=self._g(row, "purpose"), infra=self._g(row, "infra"), ) def _parse_traceability_link(self, row: dict[str, str]) -> TraceabilityLink: return TraceabilityLink( trace_id=self._g(row, "trace_id"), capability_id=self._g(row, "capability_id"), module_id=self._g(row, "module_id"), entity_ids=_split_space(self._g(row, "entity_ids")), value_flow_ids=_split_space(self._g(row, "value_flow_ids")), notes=self._g(row, "notes"), ) def _parse_change_log_entry(self, row: dict[str, str]) -> ChangeLogEntry: return ChangeLogEntry( change_id=self._g(row, "change_id"), date=self._g(row, "date"), scope=self._g(row, "scope"), description=self._g(row, "description"), ) def _parse_shared_term(self, row: dict[str, str]) -> SharedTerm: return SharedTerm( term_id=self._g(row, "term_id"), term=self._g(row, "term"), english_term=self._g(row, "english_term"), definition=self._g(row, "definition"), used_by_domains=_split_space(self._g(row, "used_by_modules")), ) def _parse_ubiquitous_term(self, row: dict[str, str]) -> UbiquitousTerm: return UbiquitousTerm( term_id=self._g(row, "term_id"), term=self._g(row, "term"), english_term=self._g(row, "english_term"), code_symbol=self._g(row, "code_symbol"), domain=self._g(row, "domain"), definition=self._g(row, "definition"), ) def _parse_scenario(self, row: dict[str, str]) -> Scenario: return Scenario( scenario_id=self._g(row, "scenario_id"), name=self._g(row, "scenario_name"), trigger=self._g(row, "trigger"), actors=self._g(row, "actors"), steps=self._g(row, "steps"), outcome=self._g(row, "outcome"), related_capabilities=_split_space(self._g(row, "related_capabilities")), ) def _parse_domain_module(self, row: dict[str, str]) -> DomainModule: return DomainModule( module_id=self._g(row, "module_id"), module_name=self._g(row, "module_name"), domain=self._g(row, "domain"), description=self._g(row, "description"), layer_in_code=self._g(row, "layer_in_code"), ) def _parse_domain_entity(self, row: dict[str, str]) -> DomainEntity: return DomainEntity( entity_id=self._g(row, "entity_id"), entity_name=self._g(row, "entity_name"), type=self._g(row, "type"), description=self._g(row, "description"), key_attributes=self._g(row, "key_attributes"), )