feat(scanner): add CSV parser — maps 20 CSV file types to Design entities

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
openclaw 2026-03-23 16:22:21 +00:00
parent 6903f6e814
commit b64eb8aa06
2 changed files with 538 additions and 0 deletions

View File

@ -0,0 +1,353 @@
"""CSV parser — maps design CSV files to Design entity instances."""
from __future__ import annotations
import csv
from pathlib import Path
from typing import Any
from app.modules.design.domain.entities import (
Capability,
ChangeLogEntry,
CodebaseAlignment,
DataFlow,
DataSecurity,
DomainEntity,
DomainModule,
Entity,
Environment,
ExternalSystem,
Integration,
Module,
RuntimeComponent,
Scenario,
SharedTerm,
TechSelection,
TraceabilityLink,
UbiquitousTerm,
UserJourney,
ValueFlow,
)
def _split_space(value: str) -> list[str]:
"""Split a space-delimited string into a list, filtering empty strings."""
if not value or not value.strip():
return []
return value.strip().split()
class CsvParser:
"""Parse CSV file and return dict mapping entity type name to list of instances.
Keys match ScanResult field names (e.g., 'capabilities', 'modules', etc.)
"""
def parse(self, file_path: Path) -> dict[str, list[Any]]:
fname = file_path.name.lower()
stem = file_path.stem.lower()
# Skip api-contracts CSV (handled by OpenAPI parser)
if "api-contracts" in fname or "api_contracts" in fname:
return {}
# Skip module-boundary (this is an MD file concept)
if "module-boundary" in fname or "module_boundary" in fname:
return {}
try:
with open(file_path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
rows = list(reader)
except Exception:
return {}
if not rows:
return {}
return self._dispatch(fname, stem, rows)
def _dispatch(self, fname: str, stem: str, rows: list[dict[str, str]]) -> dict[str, list[Any]]:
if "capability-map" in fname or "capability_map" in fname:
return {"capabilities": [self._parse_capability(r) for r in rows]}
if "value-flows" in fname or "value_flows" in fname:
return {"value_flows": [self._parse_value_flow(r) for r in rows]}
if "user-journeys" in fname or "user_journeys" in fname:
return {"user_journeys": [self._parse_user_journey(r) for r in rows]}
if "integrations" in fname:
return {"integrations": [self._parse_integration(r) for r in rows]}
if "external-systems" in fname or "external_systems" in fname:
return {"external_systems": [self._parse_external_system(r) for r in rows]}
if "codebase-alignment" in fname or "codebase_alignment" in fname:
return {"codebase_alignments": [self._parse_codebase_alignment(r) for r in rows]}
if "codebase-mapping" in fname or "codebase_mapping" in fname:
return {"codebase_alignments": [self._parse_codebase_mapping(r) for r in rows]}
# entities.csv in data-architecture (not domain-entities)
if stem == "01-entities" or (fname.endswith("entities.csv") and "domain" not in fname):
return {"entities": [self._parse_entity(r) for r in rows]}
if "data-flows" in fname or "data_flows" in fname:
return {"data_flows": [self._parse_data_flow(r) for r in rows]}
if "data-security" in fname or "data_security" in fname:
return {"data_securities": [self._parse_data_security(r) for r in rows]}
if "technology-selection" in fname or "technology_selection" in fname:
return {"tech_selections": [self._parse_tech_selection(r) for r in rows]}
if "runtime-components" in fname or "runtime_components" in fname:
return {"runtime_components": [self._parse_runtime_component(r) for r in rows]}
if "environments" in fname:
return {"environments": [self._parse_environment(r) for r in rows]}
if fname == "traceability.csv":
return {"traceability_links": [self._parse_traceability_link(r) for r in rows]}
if "change-log" in fname or "change_log" in fname:
return {"change_log_entries": [self._parse_change_log_entry(r) for r in rows]}
if "shared-terminology" in fname or "shared_terminology" in fname:
return {"shared_terms": [self._parse_shared_term(r) for r in rows]}
if "ubiquitous-language" in fname or "ubiquitous_language" in fname:
return {"ubiquitous_terms": [self._parse_ubiquitous_term(r) for r in rows]}
if "scenarios-and-flows" in fname or "scenarios_and_flows" in fname:
return {"scenarios": [self._parse_scenario(r) for r in rows]}
if "domain-modules" in fname or "domain_modules" in fname:
return {"domain_modules": [self._parse_domain_module(r) for r in rows]}
if "domain-entities" in fname or "domain_entities" in fname:
return {"domain_entities": [self._parse_domain_entity(r) for r in rows]}
# modules.csv in application-architecture
if fname.endswith("modules.csv"):
return {"modules": [self._parse_module(r) for r in rows]}
return {}
# ── Individual entity parsers ──
@staticmethod
def _g(row: dict[str, str], key: str) -> str:
"""Get a value from a row, defaulting to empty string."""
return (row.get(key) or "").strip()
def _parse_capability(self, row: dict[str, str]) -> Capability:
return Capability(
capability_id=self._g(row, "capability_id"),
name=self._g(row, "capability_name"),
description=self._g(row, "description"),
priority=self._g(row, "priority"),
phase=self._g(row, "phase"),
related_value_flows=_split_space(self._g(row, "related_value_flows")),
)
def _parse_value_flow(self, row: dict[str, str]) -> ValueFlow:
return ValueFlow(
value_flow_id=self._g(row, "value_flow_id"),
name=self._g(row, "value_flow_name"),
trigger=self._g(row, "trigger"),
actor=self._g(row, "actor"),
steps=self._g(row, "steps"),
outcome=self._g(row, "outcome"),
phase=self._g(row, "phase"),
related_capabilities=_split_space(self._g(row, "related_capabilities")),
)
def _parse_user_journey(self, row: dict[str, str]) -> UserJourney:
return UserJourney(
journey_id=self._g(row, "journey_id"),
name=self._g(row, "journey_name"),
actor=self._g(row, "actor"),
precondition=self._g(row, "precondition"),
steps=self._g(row, "steps"),
postcondition=self._g(row, "postcondition"),
phase=self._g(row, "phase"),
related_value_flows=_split_space(self._g(row, "related_value_flows")),
)
def _parse_module(self, row: dict[str, str]) -> Module:
return Module(
module_id=self._g(row, "module_id"),
name=self._g(row, "module_name"),
layer=self._g(row, "layer"),
description=self._g(row, "description"),
phase=self._g(row, "phase"),
depends_on=_split_space(self._g(row, "depends_on")),
capabilities=_split_space(self._g(row, "capabilities")),
)
def _parse_integration(self, row: dict[str, str]) -> Integration:
return Integration(
integration_id=self._g(row, "integration_id"),
source_id=self._g(row, "source_id"),
target_id=self._g(row, "target_id"),
target_type=self._g(row, "target_type"),
direction=self._g(row, "direction"),
protocol=self._g(row, "protocol"),
trigger=self._g(row, "trigger"),
phase=self._g(row, "phase"),
description=self._g(row, "description"),
)
def _parse_external_system(self, row: dict[str, str]) -> ExternalSystem:
return ExternalSystem(
system_id=self._g(row, "system_id"),
name=self._g(row, "system_name"),
type=self._g(row, "type"),
protocol=self._g(row, "protocol"),
direction=self._g(row, "direction"),
phase=self._g(row, "phase"),
description=self._g(row, "description"),
)
def _parse_codebase_alignment(self, row: dict[str, str]) -> CodebaseAlignment:
return CodebaseAlignment(
module_id=self._g(row, "module_id"),
repo_root=self._g(row, "repo_root"),
code_root=self._g(row, "code_root"),
package_name=self._g(row, "package_name"),
)
def _parse_codebase_mapping(self, row: dict[str, str]) -> CodebaseAlignment:
return CodebaseAlignment(
module_id=self._g(row, "module_id"),
repo_root="",
code_root=self._g(row, "code_path"),
package_name=self._g(row, "package"),
)
def _parse_entity(self, row: dict[str, str]) -> Entity:
return Entity(
entity_id=self._g(row, "entity_id"),
name=self._g(row, "entity_name"),
domain=self._g(row, "domain"),
owner_module=self._g(row, "owner_module"),
description=self._g(row, "description"),
phase=self._g(row, "phase"),
source_file=self._g(row, "source_file"),
)
def _parse_data_flow(self, row: dict[str, str]) -> DataFlow:
return DataFlow(
data_flow_id=self._g(row, "data_flow_id"),
source=self._g(row, "source"),
target=self._g(row, "target"),
data_content=self._g(row, "data_content"),
trigger=self._g(row, "trigger"),
protocol=self._g(row, "protocol"),
phase=self._g(row, "phase"),
description=self._g(row, "description"),
)
def _parse_data_security(self, row: dict[str, str]) -> DataSecurity:
return DataSecurity(
security_id=self._g(row, "security_id"),
sensitivity=self._g(row, "sensitivity"),
entities=self._g(row, "entities"),
protection=self._g(row, "protection_strategy"),
)
def _parse_tech_selection(self, row: dict[str, str]) -> TechSelection:
return TechSelection(
category=self._g(row, "category"),
technology=self._g(row, "technology"),
version=self._g(row, "version"),
purpose=self._g(row, "purpose"),
rationale=self._g(row, "rationale"),
alternatives_considered=self._g(row, "alternatives_considered"),
phase=self._g(row, "phase"),
)
def _parse_runtime_component(self, row: dict[str, str]) -> RuntimeComponent:
return RuntimeComponent(
component_id=self._g(row, "component_id"),
name=self._g(row, "component_name"),
type=self._g(row, "type"),
technology=self._g(row, "technology"),
port=self._g(row, "port"),
)
def _parse_environment(self, row: dict[str, str]) -> Environment:
return Environment(
env_id=self._g(row, "env_id"),
name=self._g(row, "env_name"),
purpose=self._g(row, "purpose"),
infra=self._g(row, "infra"),
)
def _parse_traceability_link(self, row: dict[str, str]) -> TraceabilityLink:
return TraceabilityLink(
trace_id=self._g(row, "trace_id"),
capability_id=self._g(row, "capability_id"),
module_id=self._g(row, "module_id"),
entity_ids=_split_space(self._g(row, "entity_ids")),
value_flow_ids=_split_space(self._g(row, "value_flow_ids")),
notes=self._g(row, "notes"),
)
def _parse_change_log_entry(self, row: dict[str, str]) -> ChangeLogEntry:
return ChangeLogEntry(
change_id=self._g(row, "change_id"),
date=self._g(row, "date"),
scope=self._g(row, "scope"),
description=self._g(row, "description"),
)
def _parse_shared_term(self, row: dict[str, str]) -> SharedTerm:
return SharedTerm(
term_id=self._g(row, "term_id"),
term=self._g(row, "term"),
english_term=self._g(row, "english_term"),
definition=self._g(row, "definition"),
used_by_domains=_split_space(self._g(row, "used_by_modules")),
)
def _parse_ubiquitous_term(self, row: dict[str, str]) -> UbiquitousTerm:
return UbiquitousTerm(
term_id=self._g(row, "term_id"),
term=self._g(row, "term"),
english_term=self._g(row, "english_term"),
code_symbol=self._g(row, "code_symbol"),
domain=self._g(row, "domain"),
definition=self._g(row, "definition"),
)
def _parse_scenario(self, row: dict[str, str]) -> Scenario:
return Scenario(
scenario_id=self._g(row, "scenario_id"),
name=self._g(row, "scenario_name"),
trigger=self._g(row, "trigger"),
actors=self._g(row, "actors"),
steps=self._g(row, "steps"),
outcome=self._g(row, "outcome"),
related_capabilities=_split_space(self._g(row, "related_capabilities")),
)
def _parse_domain_module(self, row: dict[str, str]) -> DomainModule:
return DomainModule(
module_id=self._g(row, "module_id"),
module_name=self._g(row, "module_name"),
domain=self._g(row, "domain"),
description=self._g(row, "description"),
layer_in_code=self._g(row, "layer_in_code"),
)
def _parse_domain_entity(self, row: dict[str, str]) -> DomainEntity:
return DomainEntity(
entity_id=self._g(row, "entity_id"),
entity_name=self._g(row, "entity_name"),
type=self._g(row, "type"),
description=self._g(row, "description"),
key_attributes=self._g(row, "key_attributes"),
)

View File

@ -0,0 +1,185 @@
"""Tests for scanner parsers (CSV, MD, YAML, OpenAPI)."""
from pathlib import Path
import pytest
from app.modules.scanner.infrastructure.parsers.csv_parser import CsvParser
DESIGN_DIR = Path("/workspace/arch-design-agent-skill-dashboard/design")
@pytest.fixture
def csv_parser():
return CsvParser()
# ── CSV Parser Tests ──
class TestCsvParserCapabilities:
def test_parse_capability_map(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "business-architecture" / "02-capability-map.csv")
assert "capabilities" in result
caps = result["capabilities"]
assert len(caps) > 0
cap = caps[0]
assert cap.capability_id.startswith("CAP-")
assert cap.name # should have a name
assert isinstance(cap.related_value_flows, list)
def test_capability_related_value_flows_split(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "business-architecture" / "02-capability-map.csv")
caps = result["capabilities"]
# CAP-PROGRESS-DESIGN has "VF-02 VF-03" which should be split
progress_cap = [c for c in caps if c.capability_id == "CAP-PROGRESS-DESIGN"]
assert len(progress_cap) == 1
assert progress_cap[0].related_value_flows == ["VF-02", "VF-03"]
class TestCsvParserModules:
def test_parse_modules(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "application-architecture" / "02-modules.csv")
assert "modules" in result
mods = result["modules"]
assert len(mods) > 0
mod = mods[0]
assert mod.module_id.startswith("MOD-")
assert isinstance(mod.depends_on, list)
assert isinstance(mod.capabilities, list)
def test_module_list_fields(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "application-architecture" / "02-modules.csv")
mods = result["modules"]
scanner = [m for m in mods if m.module_id == "MOD-SCANNER"]
assert len(scanner) == 1
assert "MOD-DESIGN" in scanner[0].depends_on
assert len(scanner[0].capabilities) > 0
class TestCsvParserTraceability:
def test_parse_traceability(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "traceability.csv")
assert "traceability_links" in result
links = result["traceability_links"]
assert len(links) > 0
link = links[0]
assert link.trace_id.startswith("TR-")
assert isinstance(link.entity_ids, list)
assert isinstance(link.value_flow_ids, list)
def test_traceability_space_split(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "traceability.csv")
links = result["traceability_links"]
# TR-04 has many entity_ids space-separated
tr04 = [l for l in links if l.trace_id == "TR-04"]
assert len(tr04) == 1
assert len(tr04[0].entity_ids) > 5
class TestCsvParserOtherTypes:
def test_parse_value_flows(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "business-architecture" / "03-value-flows.csv")
assert "value_flows" in result
assert len(result["value_flows"]) > 0
def test_parse_user_journeys(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "business-architecture" / "04-user-journeys.csv")
assert "user_journeys" in result
assert len(result["user_journeys"]) > 0
def test_parse_integrations(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "application-architecture" / "03-integrations.csv")
assert "integrations" in result
assert len(result["integrations"]) > 0
def test_parse_external_systems(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "application-architecture" / "01-external-systems.csv")
assert "external_systems" in result
assert len(result["external_systems"]) > 0
def test_parse_entities(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "data-architecture" / "01-entities.csv")
assert "entities" in result
assert len(result["entities"]) > 0
def test_parse_data_flows(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "data-architecture" / "02-data-flows.csv")
assert "data_flows" in result
assert len(result["data_flows"]) > 0
def test_parse_data_security(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "data-architecture" / "03-data-security.csv")
assert "data_securities" in result
assert len(result["data_securities"]) > 0
def test_parse_tech_selections(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "technology-architecture" / "00-technology-selection.csv")
assert "tech_selections" in result
assert len(result["tech_selections"]) > 0
def test_parse_runtime_components(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "technology-architecture" / "01-runtime-components.csv")
assert "runtime_components" in result
assert len(result["runtime_components"]) > 0
def test_parse_environments(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "technology-architecture" / "02-environments.csv")
assert "environments" in result
assert len(result["environments"]) > 0
def test_parse_change_log(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "change-log.csv")
assert "change_log_entries" in result
assert len(result["change_log_entries"]) > 0
def test_parse_shared_terminology(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "domains" / "_shared" / "01-shared-terminology.csv")
assert "shared_terms" in result
terms = result["shared_terms"]
assert len(terms) > 0
# Check used_by_domains is a list (space-split)
assert isinstance(terms[0].used_by_domains, list)
assert len(terms[0].used_by_domains) > 0
def test_parse_ubiquitous_language(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "domains" / "design" / "02-ubiquitous-language.csv")
assert "ubiquitous_terms" in result
assert len(result["ubiquitous_terms"]) > 0
def test_parse_scenarios(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "domains" / "design" / "03-scenarios-and-flows.csv")
assert "scenarios" in result
assert len(result["scenarios"]) > 0
def test_parse_domain_modules(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "domains" / "design" / "04-domain-modules.csv")
assert "domain_modules" in result
assert len(result["domain_modules"]) > 0
def test_parse_domain_entities(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "domains" / "design" / "05-domain-entities.csv")
assert "domain_entities" in result
assert len(result["domain_entities"]) > 0
def test_parse_codebase_alignment(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "application-architecture" / "06-codebase-alignment.csv")
assert "codebase_alignments" in result
assert len(result["codebase_alignments"]) > 0
def test_parse_codebase_mapping(self, csv_parser):
result = csv_parser.parse(DESIGN_DIR / "domains" / "design" / "07-codebase-mapping.csv")
assert "codebase_alignments" in result
assert len(result["codebase_alignments"]) > 0
class TestCsvParserUnknown:
def test_unknown_csv_returns_empty(self, csv_parser, tmp_path):
unknown = tmp_path / "unknown-file.csv"
unknown.write_text("col1,col2\nval1,val2\n")
result = csv_parser.parse(unknown)
assert result == {}
def test_nonexistent_file_returns_empty(self, csv_parser):
result = csv_parser.parse(Path("/nonexistent/file.csv"))
assert result == {}