354 lines
14 KiB
Python
354 lines
14 KiB
Python
"""CSV parser — maps design CSV files to Design entity instances."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from app.modules.design.domain.entities import (
|
|
Capability,
|
|
ChangeLogEntry,
|
|
CodebaseAlignment,
|
|
DataFlow,
|
|
DataSecurity,
|
|
DomainEntity,
|
|
DomainModule,
|
|
Entity,
|
|
Environment,
|
|
ExternalSystem,
|
|
Integration,
|
|
Module,
|
|
RuntimeComponent,
|
|
Scenario,
|
|
SharedTerm,
|
|
TechSelection,
|
|
TraceabilityLink,
|
|
UbiquitousTerm,
|
|
UserJourney,
|
|
ValueFlow,
|
|
)
|
|
|
|
|
|
def _split_space(value: str) -> list[str]:
|
|
"""Split a space-delimited string into a list, filtering empty strings."""
|
|
if not value or not value.strip():
|
|
return []
|
|
return value.strip().split()
|
|
|
|
|
|
class CsvParser:
|
|
"""Parse CSV file and return dict mapping entity type name to list of instances.
|
|
|
|
Keys match ScanResult field names (e.g., 'capabilities', 'modules', etc.)
|
|
"""
|
|
|
|
def parse(self, file_path: Path) -> dict[str, list[Any]]:
|
|
fname = file_path.name.lower()
|
|
stem = file_path.stem.lower()
|
|
|
|
# Skip api-contracts CSV (handled by OpenAPI parser)
|
|
if "api-contracts" in fname or "api_contracts" in fname:
|
|
return {}
|
|
|
|
# Skip module-boundary (this is an MD file concept)
|
|
if "module-boundary" in fname or "module_boundary" in fname:
|
|
return {}
|
|
|
|
try:
|
|
with open(file_path, newline="", encoding="utf-8") as f:
|
|
reader = csv.DictReader(f)
|
|
rows = list(reader)
|
|
except Exception:
|
|
return {}
|
|
|
|
if not rows:
|
|
return {}
|
|
|
|
return self._dispatch(fname, stem, rows)
|
|
|
|
def _dispatch(self, fname: str, stem: str, rows: list[dict[str, str]]) -> dict[str, list[Any]]:
|
|
if "capability-map" in fname or "capability_map" in fname:
|
|
return {"capabilities": [self._parse_capability(r) for r in rows]}
|
|
|
|
if "value-flows" in fname or "value_flows" in fname:
|
|
return {"value_flows": [self._parse_value_flow(r) for r in rows]}
|
|
|
|
if "user-journeys" in fname or "user_journeys" in fname:
|
|
return {"user_journeys": [self._parse_user_journey(r) for r in rows]}
|
|
|
|
if "integrations" in fname:
|
|
return {"integrations": [self._parse_integration(r) for r in rows]}
|
|
|
|
if "external-systems" in fname or "external_systems" in fname:
|
|
return {"external_systems": [self._parse_external_system(r) for r in rows]}
|
|
|
|
if "codebase-alignment" in fname or "codebase_alignment" in fname:
|
|
return {"codebase_alignments": [self._parse_codebase_alignment(r) for r in rows]}
|
|
|
|
if "codebase-mapping" in fname or "codebase_mapping" in fname:
|
|
return {"codebase_alignments": [self._parse_codebase_mapping(r) for r in rows]}
|
|
|
|
# entities.csv in data-architecture (not domain-entities)
|
|
if stem == "01-entities" or (fname.endswith("entities.csv") and "domain" not in fname):
|
|
return {"entities": [self._parse_entity(r) for r in rows]}
|
|
|
|
if "data-flows" in fname or "data_flows" in fname:
|
|
return {"data_flows": [self._parse_data_flow(r) for r in rows]}
|
|
|
|
if "data-security" in fname or "data_security" in fname:
|
|
return {"data_securities": [self._parse_data_security(r) for r in rows]}
|
|
|
|
if "technology-selection" in fname or "technology_selection" in fname:
|
|
return {"tech_selections": [self._parse_tech_selection(r) for r in rows]}
|
|
|
|
if "runtime-components" in fname or "runtime_components" in fname:
|
|
return {"runtime_components": [self._parse_runtime_component(r) for r in rows]}
|
|
|
|
if "environments" in fname:
|
|
return {"environments": [self._parse_environment(r) for r in rows]}
|
|
|
|
if fname == "traceability.csv":
|
|
return {"traceability_links": [self._parse_traceability_link(r) for r in rows]}
|
|
|
|
if "change-log" in fname or "change_log" in fname:
|
|
return {"change_log_entries": [self._parse_change_log_entry(r) for r in rows]}
|
|
|
|
if "shared-terminology" in fname or "shared_terminology" in fname:
|
|
return {"shared_terms": [self._parse_shared_term(r) for r in rows]}
|
|
|
|
if "ubiquitous-language" in fname or "ubiquitous_language" in fname:
|
|
return {"ubiquitous_terms": [self._parse_ubiquitous_term(r) for r in rows]}
|
|
|
|
if "scenarios-and-flows" in fname or "scenarios_and_flows" in fname:
|
|
return {"scenarios": [self._parse_scenario(r) for r in rows]}
|
|
|
|
if "domain-modules" in fname or "domain_modules" in fname:
|
|
return {"domain_modules": [self._parse_domain_module(r) for r in rows]}
|
|
|
|
if "domain-entities" in fname or "domain_entities" in fname:
|
|
return {"domain_entities": [self._parse_domain_entity(r) for r in rows]}
|
|
|
|
# modules.csv in application-architecture
|
|
if fname.endswith("modules.csv"):
|
|
return {"modules": [self._parse_module(r) for r in rows]}
|
|
|
|
return {}
|
|
|
|
# ── Individual entity parsers ──
|
|
|
|
@staticmethod
|
|
def _g(row: dict[str, str], key: str) -> str:
|
|
"""Get a value from a row, defaulting to empty string."""
|
|
return (row.get(key) or "").strip()
|
|
|
|
def _parse_capability(self, row: dict[str, str]) -> Capability:
|
|
return Capability(
|
|
capability_id=self._g(row, "capability_id"),
|
|
name=self._g(row, "capability_name"),
|
|
description=self._g(row, "description"),
|
|
priority=self._g(row, "priority"),
|
|
phase=self._g(row, "phase"),
|
|
related_value_flows=_split_space(self._g(row, "related_value_flows")),
|
|
)
|
|
|
|
def _parse_value_flow(self, row: dict[str, str]) -> ValueFlow:
|
|
return ValueFlow(
|
|
value_flow_id=self._g(row, "value_flow_id"),
|
|
name=self._g(row, "value_flow_name"),
|
|
trigger=self._g(row, "trigger"),
|
|
actor=self._g(row, "actor"),
|
|
steps=self._g(row, "steps"),
|
|
outcome=self._g(row, "outcome"),
|
|
phase=self._g(row, "phase"),
|
|
related_capabilities=_split_space(self._g(row, "related_capabilities")),
|
|
)
|
|
|
|
def _parse_user_journey(self, row: dict[str, str]) -> UserJourney:
|
|
return UserJourney(
|
|
journey_id=self._g(row, "journey_id"),
|
|
name=self._g(row, "journey_name"),
|
|
actor=self._g(row, "actor"),
|
|
precondition=self._g(row, "precondition"),
|
|
steps=self._g(row, "steps"),
|
|
postcondition=self._g(row, "postcondition"),
|
|
phase=self._g(row, "phase"),
|
|
related_value_flows=_split_space(self._g(row, "related_value_flows")),
|
|
)
|
|
|
|
def _parse_module(self, row: dict[str, str]) -> Module:
|
|
return Module(
|
|
module_id=self._g(row, "module_id"),
|
|
name=self._g(row, "module_name"),
|
|
layer=self._g(row, "layer"),
|
|
description=self._g(row, "description"),
|
|
phase=self._g(row, "phase"),
|
|
depends_on=_split_space(self._g(row, "depends_on")),
|
|
capabilities=_split_space(self._g(row, "capabilities")),
|
|
)
|
|
|
|
def _parse_integration(self, row: dict[str, str]) -> Integration:
|
|
return Integration(
|
|
integration_id=self._g(row, "integration_id"),
|
|
source_id=self._g(row, "source_id"),
|
|
target_id=self._g(row, "target_id"),
|
|
target_type=self._g(row, "target_type"),
|
|
direction=self._g(row, "direction"),
|
|
protocol=self._g(row, "protocol"),
|
|
trigger=self._g(row, "trigger"),
|
|
phase=self._g(row, "phase"),
|
|
description=self._g(row, "description"),
|
|
)
|
|
|
|
def _parse_external_system(self, row: dict[str, str]) -> ExternalSystem:
|
|
return ExternalSystem(
|
|
system_id=self._g(row, "system_id"),
|
|
name=self._g(row, "system_name"),
|
|
type=self._g(row, "type"),
|
|
protocol=self._g(row, "protocol"),
|
|
direction=self._g(row, "direction"),
|
|
phase=self._g(row, "phase"),
|
|
description=self._g(row, "description"),
|
|
)
|
|
|
|
def _parse_codebase_alignment(self, row: dict[str, str]) -> CodebaseAlignment:
|
|
return CodebaseAlignment(
|
|
module_id=self._g(row, "module_id"),
|
|
repo_root=self._g(row, "repo_root"),
|
|
code_root=self._g(row, "code_root"),
|
|
package_name=self._g(row, "package_name"),
|
|
)
|
|
|
|
def _parse_codebase_mapping(self, row: dict[str, str]) -> CodebaseAlignment:
|
|
return CodebaseAlignment(
|
|
module_id=self._g(row, "module_id"),
|
|
repo_root="",
|
|
code_root=self._g(row, "code_path"),
|
|
package_name=self._g(row, "package"),
|
|
)
|
|
|
|
def _parse_entity(self, row: dict[str, str]) -> Entity:
|
|
return Entity(
|
|
entity_id=self._g(row, "entity_id"),
|
|
name=self._g(row, "entity_name"),
|
|
domain=self._g(row, "domain"),
|
|
owner_module=self._g(row, "owner_module"),
|
|
description=self._g(row, "description"),
|
|
phase=self._g(row, "phase"),
|
|
source_file=self._g(row, "source_file"),
|
|
)
|
|
|
|
def _parse_data_flow(self, row: dict[str, str]) -> DataFlow:
|
|
return DataFlow(
|
|
data_flow_id=self._g(row, "data_flow_id"),
|
|
source=self._g(row, "source"),
|
|
target=self._g(row, "target"),
|
|
data_content=self._g(row, "data_content"),
|
|
trigger=self._g(row, "trigger"),
|
|
protocol=self._g(row, "protocol"),
|
|
phase=self._g(row, "phase"),
|
|
description=self._g(row, "description"),
|
|
)
|
|
|
|
def _parse_data_security(self, row: dict[str, str]) -> DataSecurity:
|
|
return DataSecurity(
|
|
security_id=self._g(row, "security_id"),
|
|
sensitivity=self._g(row, "sensitivity"),
|
|
entities=self._g(row, "entities"),
|
|
protection=self._g(row, "protection_strategy"),
|
|
)
|
|
|
|
def _parse_tech_selection(self, row: dict[str, str]) -> TechSelection:
|
|
return TechSelection(
|
|
category=self._g(row, "category"),
|
|
technology=self._g(row, "technology"),
|
|
version=self._g(row, "version"),
|
|
purpose=self._g(row, "purpose"),
|
|
rationale=self._g(row, "rationale"),
|
|
alternatives_considered=self._g(row, "alternatives_considered"),
|
|
phase=self._g(row, "phase"),
|
|
)
|
|
|
|
def _parse_runtime_component(self, row: dict[str, str]) -> RuntimeComponent:
|
|
return RuntimeComponent(
|
|
component_id=self._g(row, "component_id"),
|
|
name=self._g(row, "component_name"),
|
|
type=self._g(row, "type"),
|
|
technology=self._g(row, "technology"),
|
|
port=self._g(row, "port"),
|
|
)
|
|
|
|
def _parse_environment(self, row: dict[str, str]) -> Environment:
|
|
return Environment(
|
|
env_id=self._g(row, "env_id"),
|
|
name=self._g(row, "env_name"),
|
|
purpose=self._g(row, "purpose"),
|
|
infra=self._g(row, "infra"),
|
|
)
|
|
|
|
def _parse_traceability_link(self, row: dict[str, str]) -> TraceabilityLink:
|
|
return TraceabilityLink(
|
|
trace_id=self._g(row, "trace_id"),
|
|
capability_id=self._g(row, "capability_id"),
|
|
module_id=self._g(row, "module_id"),
|
|
entity_ids=_split_space(self._g(row, "entity_ids")),
|
|
value_flow_ids=_split_space(self._g(row, "value_flow_ids")),
|
|
notes=self._g(row, "notes"),
|
|
)
|
|
|
|
def _parse_change_log_entry(self, row: dict[str, str]) -> ChangeLogEntry:
|
|
return ChangeLogEntry(
|
|
change_id=self._g(row, "change_id"),
|
|
date=self._g(row, "date"),
|
|
scope=self._g(row, "scope"),
|
|
description=self._g(row, "description"),
|
|
)
|
|
|
|
def _parse_shared_term(self, row: dict[str, str]) -> SharedTerm:
|
|
return SharedTerm(
|
|
term_id=self._g(row, "term_id"),
|
|
term=self._g(row, "term"),
|
|
english_term=self._g(row, "english_term"),
|
|
definition=self._g(row, "definition"),
|
|
used_by_domains=_split_space(self._g(row, "used_by_modules")),
|
|
)
|
|
|
|
def _parse_ubiquitous_term(self, row: dict[str, str]) -> UbiquitousTerm:
|
|
return UbiquitousTerm(
|
|
term_id=self._g(row, "term_id"),
|
|
term=self._g(row, "term"),
|
|
english_term=self._g(row, "english_term"),
|
|
code_symbol=self._g(row, "code_symbol"),
|
|
domain=self._g(row, "domain"),
|
|
definition=self._g(row, "definition"),
|
|
)
|
|
|
|
def _parse_scenario(self, row: dict[str, str]) -> Scenario:
|
|
return Scenario(
|
|
scenario_id=self._g(row, "scenario_id"),
|
|
name=self._g(row, "scenario_name"),
|
|
trigger=self._g(row, "trigger"),
|
|
actors=self._g(row, "actors"),
|
|
steps=self._g(row, "steps"),
|
|
outcome=self._g(row, "outcome"),
|
|
related_capabilities=_split_space(self._g(row, "related_capabilities")),
|
|
)
|
|
|
|
def _parse_domain_module(self, row: dict[str, str]) -> DomainModule:
|
|
return DomainModule(
|
|
module_id=self._g(row, "module_id"),
|
|
module_name=self._g(row, "module_name"),
|
|
domain=self._g(row, "domain"),
|
|
description=self._g(row, "description"),
|
|
layer_in_code=self._g(row, "layer_in_code"),
|
|
)
|
|
|
|
def _parse_domain_entity(self, row: dict[str, str]) -> DomainEntity:
|
|
return DomainEntity(
|
|
entity_id=self._g(row, "entity_id"),
|
|
entity_name=self._g(row, "entity_name"),
|
|
type=self._g(row, "type"),
|
|
description=self._g(row, "description"),
|
|
key_attributes=self._g(row, "key_attributes"),
|
|
)
|