From b64eb8aa06c9eb5a6efb8694db697edfef1d89c9 Mon Sep 17 00:00:00 2001 From: openclaw Date: Mon, 23 Mar 2026 16:22:21 +0000 Subject: [PATCH] =?UTF-8?q?feat(scanner):=20add=20CSV=20parser=20=E2=80=94?= =?UTF-8?q?=20maps=2020=20CSV=20file=20types=20to=20Design=20entities?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 --- .../infrastructure/parsers/csv_parser.py | 353 ++++++++++++++++++ backend/tests/test_scanner_parsers.py | 185 +++++++++ 2 files changed, 538 insertions(+) create mode 100644 backend/tests/test_scanner_parsers.py diff --git a/backend/app/modules/scanner/infrastructure/parsers/csv_parser.py b/backend/app/modules/scanner/infrastructure/parsers/csv_parser.py index e69de29..ae0f23d 100644 --- a/backend/app/modules/scanner/infrastructure/parsers/csv_parser.py +++ b/backend/app/modules/scanner/infrastructure/parsers/csv_parser.py @@ -0,0 +1,353 @@ +"""CSV parser — maps design CSV files to Design entity instances.""" + +from __future__ import annotations + +import csv +from pathlib import Path +from typing import Any + +from app.modules.design.domain.entities import ( + Capability, + ChangeLogEntry, + CodebaseAlignment, + DataFlow, + DataSecurity, + DomainEntity, + DomainModule, + Entity, + Environment, + ExternalSystem, + Integration, + Module, + RuntimeComponent, + Scenario, + SharedTerm, + TechSelection, + TraceabilityLink, + UbiquitousTerm, + UserJourney, + ValueFlow, +) + + +def _split_space(value: str) -> list[str]: + """Split a space-delimited string into a list, filtering empty strings.""" + if not value or not value.strip(): + return [] + return value.strip().split() + + +class CsvParser: + """Parse CSV file and return dict mapping entity type name to list of instances. + + Keys match ScanResult field names (e.g., 'capabilities', 'modules', etc.) + """ + + def parse(self, file_path: Path) -> dict[str, list[Any]]: + fname = file_path.name.lower() + stem = file_path.stem.lower() + + # Skip api-contracts CSV (handled by OpenAPI parser) + if "api-contracts" in fname or "api_contracts" in fname: + return {} + + # Skip module-boundary (this is an MD file concept) + if "module-boundary" in fname or "module_boundary" in fname: + return {} + + try: + with open(file_path, newline="", encoding="utf-8") as f: + reader = csv.DictReader(f) + rows = list(reader) + except Exception: + return {} + + if not rows: + return {} + + return self._dispatch(fname, stem, rows) + + def _dispatch(self, fname: str, stem: str, rows: list[dict[str, str]]) -> dict[str, list[Any]]: + if "capability-map" in fname or "capability_map" in fname: + return {"capabilities": [self._parse_capability(r) for r in rows]} + + if "value-flows" in fname or "value_flows" in fname: + return {"value_flows": [self._parse_value_flow(r) for r in rows]} + + if "user-journeys" in fname or "user_journeys" in fname: + return {"user_journeys": [self._parse_user_journey(r) for r in rows]} + + if "integrations" in fname: + return {"integrations": [self._parse_integration(r) for r in rows]} + + if "external-systems" in fname or "external_systems" in fname: + return {"external_systems": [self._parse_external_system(r) for r in rows]} + + if "codebase-alignment" in fname or "codebase_alignment" in fname: + return {"codebase_alignments": [self._parse_codebase_alignment(r) for r in rows]} + + if "codebase-mapping" in fname or "codebase_mapping" in fname: + return {"codebase_alignments": [self._parse_codebase_mapping(r) for r in rows]} + + # entities.csv in data-architecture (not domain-entities) + if stem == "01-entities" or (fname.endswith("entities.csv") and "domain" not in fname): + return {"entities": [self._parse_entity(r) for r in rows]} + + if "data-flows" in fname or "data_flows" in fname: + return {"data_flows": [self._parse_data_flow(r) for r in rows]} + + if "data-security" in fname or "data_security" in fname: + return {"data_securities": [self._parse_data_security(r) for r in rows]} + + if "technology-selection" in fname or "technology_selection" in fname: + return {"tech_selections": [self._parse_tech_selection(r) for r in rows]} + + if "runtime-components" in fname or "runtime_components" in fname: + return {"runtime_components": [self._parse_runtime_component(r) for r in rows]} + + if "environments" in fname: + return {"environments": [self._parse_environment(r) for r in rows]} + + if fname == "traceability.csv": + return {"traceability_links": [self._parse_traceability_link(r) for r in rows]} + + if "change-log" in fname or "change_log" in fname: + return {"change_log_entries": [self._parse_change_log_entry(r) for r in rows]} + + if "shared-terminology" in fname or "shared_terminology" in fname: + return {"shared_terms": [self._parse_shared_term(r) for r in rows]} + + if "ubiquitous-language" in fname or "ubiquitous_language" in fname: + return {"ubiquitous_terms": [self._parse_ubiquitous_term(r) for r in rows]} + + if "scenarios-and-flows" in fname or "scenarios_and_flows" in fname: + return {"scenarios": [self._parse_scenario(r) for r in rows]} + + if "domain-modules" in fname or "domain_modules" in fname: + return {"domain_modules": [self._parse_domain_module(r) for r in rows]} + + if "domain-entities" in fname or "domain_entities" in fname: + return {"domain_entities": [self._parse_domain_entity(r) for r in rows]} + + # modules.csv in application-architecture + if fname.endswith("modules.csv"): + return {"modules": [self._parse_module(r) for r in rows]} + + return {} + + # ── Individual entity parsers ── + + @staticmethod + def _g(row: dict[str, str], key: str) -> str: + """Get a value from a row, defaulting to empty string.""" + return (row.get(key) or "").strip() + + def _parse_capability(self, row: dict[str, str]) -> Capability: + return Capability( + capability_id=self._g(row, "capability_id"), + name=self._g(row, "capability_name"), + description=self._g(row, "description"), + priority=self._g(row, "priority"), + phase=self._g(row, "phase"), + related_value_flows=_split_space(self._g(row, "related_value_flows")), + ) + + def _parse_value_flow(self, row: dict[str, str]) -> ValueFlow: + return ValueFlow( + value_flow_id=self._g(row, "value_flow_id"), + name=self._g(row, "value_flow_name"), + trigger=self._g(row, "trigger"), + actor=self._g(row, "actor"), + steps=self._g(row, "steps"), + outcome=self._g(row, "outcome"), + phase=self._g(row, "phase"), + related_capabilities=_split_space(self._g(row, "related_capabilities")), + ) + + def _parse_user_journey(self, row: dict[str, str]) -> UserJourney: + return UserJourney( + journey_id=self._g(row, "journey_id"), + name=self._g(row, "journey_name"), + actor=self._g(row, "actor"), + precondition=self._g(row, "precondition"), + steps=self._g(row, "steps"), + postcondition=self._g(row, "postcondition"), + phase=self._g(row, "phase"), + related_value_flows=_split_space(self._g(row, "related_value_flows")), + ) + + def _parse_module(self, row: dict[str, str]) -> Module: + return Module( + module_id=self._g(row, "module_id"), + name=self._g(row, "module_name"), + layer=self._g(row, "layer"), + description=self._g(row, "description"), + phase=self._g(row, "phase"), + depends_on=_split_space(self._g(row, "depends_on")), + capabilities=_split_space(self._g(row, "capabilities")), + ) + + def _parse_integration(self, row: dict[str, str]) -> Integration: + return Integration( + integration_id=self._g(row, "integration_id"), + source_id=self._g(row, "source_id"), + target_id=self._g(row, "target_id"), + target_type=self._g(row, "target_type"), + direction=self._g(row, "direction"), + protocol=self._g(row, "protocol"), + trigger=self._g(row, "trigger"), + phase=self._g(row, "phase"), + description=self._g(row, "description"), + ) + + def _parse_external_system(self, row: dict[str, str]) -> ExternalSystem: + return ExternalSystem( + system_id=self._g(row, "system_id"), + name=self._g(row, "system_name"), + type=self._g(row, "type"), + protocol=self._g(row, "protocol"), + direction=self._g(row, "direction"), + phase=self._g(row, "phase"), + description=self._g(row, "description"), + ) + + def _parse_codebase_alignment(self, row: dict[str, str]) -> CodebaseAlignment: + return CodebaseAlignment( + module_id=self._g(row, "module_id"), + repo_root=self._g(row, "repo_root"), + code_root=self._g(row, "code_root"), + package_name=self._g(row, "package_name"), + ) + + def _parse_codebase_mapping(self, row: dict[str, str]) -> CodebaseAlignment: + return CodebaseAlignment( + module_id=self._g(row, "module_id"), + repo_root="", + code_root=self._g(row, "code_path"), + package_name=self._g(row, "package"), + ) + + def _parse_entity(self, row: dict[str, str]) -> Entity: + return Entity( + entity_id=self._g(row, "entity_id"), + name=self._g(row, "entity_name"), + domain=self._g(row, "domain"), + owner_module=self._g(row, "owner_module"), + description=self._g(row, "description"), + phase=self._g(row, "phase"), + source_file=self._g(row, "source_file"), + ) + + def _parse_data_flow(self, row: dict[str, str]) -> DataFlow: + return DataFlow( + data_flow_id=self._g(row, "data_flow_id"), + source=self._g(row, "source"), + target=self._g(row, "target"), + data_content=self._g(row, "data_content"), + trigger=self._g(row, "trigger"), + protocol=self._g(row, "protocol"), + phase=self._g(row, "phase"), + description=self._g(row, "description"), + ) + + def _parse_data_security(self, row: dict[str, str]) -> DataSecurity: + return DataSecurity( + security_id=self._g(row, "security_id"), + sensitivity=self._g(row, "sensitivity"), + entities=self._g(row, "entities"), + protection=self._g(row, "protection_strategy"), + ) + + def _parse_tech_selection(self, row: dict[str, str]) -> TechSelection: + return TechSelection( + category=self._g(row, "category"), + technology=self._g(row, "technology"), + version=self._g(row, "version"), + purpose=self._g(row, "purpose"), + rationale=self._g(row, "rationale"), + alternatives_considered=self._g(row, "alternatives_considered"), + phase=self._g(row, "phase"), + ) + + def _parse_runtime_component(self, row: dict[str, str]) -> RuntimeComponent: + return RuntimeComponent( + component_id=self._g(row, "component_id"), + name=self._g(row, "component_name"), + type=self._g(row, "type"), + technology=self._g(row, "technology"), + port=self._g(row, "port"), + ) + + def _parse_environment(self, row: dict[str, str]) -> Environment: + return Environment( + env_id=self._g(row, "env_id"), + name=self._g(row, "env_name"), + purpose=self._g(row, "purpose"), + infra=self._g(row, "infra"), + ) + + def _parse_traceability_link(self, row: dict[str, str]) -> TraceabilityLink: + return TraceabilityLink( + trace_id=self._g(row, "trace_id"), + capability_id=self._g(row, "capability_id"), + module_id=self._g(row, "module_id"), + entity_ids=_split_space(self._g(row, "entity_ids")), + value_flow_ids=_split_space(self._g(row, "value_flow_ids")), + notes=self._g(row, "notes"), + ) + + def _parse_change_log_entry(self, row: dict[str, str]) -> ChangeLogEntry: + return ChangeLogEntry( + change_id=self._g(row, "change_id"), + date=self._g(row, "date"), + scope=self._g(row, "scope"), + description=self._g(row, "description"), + ) + + def _parse_shared_term(self, row: dict[str, str]) -> SharedTerm: + return SharedTerm( + term_id=self._g(row, "term_id"), + term=self._g(row, "term"), + english_term=self._g(row, "english_term"), + definition=self._g(row, "definition"), + used_by_domains=_split_space(self._g(row, "used_by_modules")), + ) + + def _parse_ubiquitous_term(self, row: dict[str, str]) -> UbiquitousTerm: + return UbiquitousTerm( + term_id=self._g(row, "term_id"), + term=self._g(row, "term"), + english_term=self._g(row, "english_term"), + code_symbol=self._g(row, "code_symbol"), + domain=self._g(row, "domain"), + definition=self._g(row, "definition"), + ) + + def _parse_scenario(self, row: dict[str, str]) -> Scenario: + return Scenario( + scenario_id=self._g(row, "scenario_id"), + name=self._g(row, "scenario_name"), + trigger=self._g(row, "trigger"), + actors=self._g(row, "actors"), + steps=self._g(row, "steps"), + outcome=self._g(row, "outcome"), + related_capabilities=_split_space(self._g(row, "related_capabilities")), + ) + + def _parse_domain_module(self, row: dict[str, str]) -> DomainModule: + return DomainModule( + module_id=self._g(row, "module_id"), + module_name=self._g(row, "module_name"), + domain=self._g(row, "domain"), + description=self._g(row, "description"), + layer_in_code=self._g(row, "layer_in_code"), + ) + + def _parse_domain_entity(self, row: dict[str, str]) -> DomainEntity: + return DomainEntity( + entity_id=self._g(row, "entity_id"), + entity_name=self._g(row, "entity_name"), + type=self._g(row, "type"), + description=self._g(row, "description"), + key_attributes=self._g(row, "key_attributes"), + ) diff --git a/backend/tests/test_scanner_parsers.py b/backend/tests/test_scanner_parsers.py new file mode 100644 index 0000000..fb95340 --- /dev/null +++ b/backend/tests/test_scanner_parsers.py @@ -0,0 +1,185 @@ +"""Tests for scanner parsers (CSV, MD, YAML, OpenAPI).""" + +from pathlib import Path + +import pytest + +from app.modules.scanner.infrastructure.parsers.csv_parser import CsvParser + +DESIGN_DIR = Path("/workspace/arch-design-agent-skill-dashboard/design") + + +@pytest.fixture +def csv_parser(): + return CsvParser() + + +# ── CSV Parser Tests ── + + +class TestCsvParserCapabilities: + def test_parse_capability_map(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "business-architecture" / "02-capability-map.csv") + assert "capabilities" in result + caps = result["capabilities"] + assert len(caps) > 0 + cap = caps[0] + assert cap.capability_id.startswith("CAP-") + assert cap.name # should have a name + assert isinstance(cap.related_value_flows, list) + + def test_capability_related_value_flows_split(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "business-architecture" / "02-capability-map.csv") + caps = result["capabilities"] + # CAP-PROGRESS-DESIGN has "VF-02 VF-03" which should be split + progress_cap = [c for c in caps if c.capability_id == "CAP-PROGRESS-DESIGN"] + assert len(progress_cap) == 1 + assert progress_cap[0].related_value_flows == ["VF-02", "VF-03"] + + +class TestCsvParserModules: + def test_parse_modules(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "application-architecture" / "02-modules.csv") + assert "modules" in result + mods = result["modules"] + assert len(mods) > 0 + mod = mods[0] + assert mod.module_id.startswith("MOD-") + assert isinstance(mod.depends_on, list) + assert isinstance(mod.capabilities, list) + + def test_module_list_fields(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "application-architecture" / "02-modules.csv") + mods = result["modules"] + scanner = [m for m in mods if m.module_id == "MOD-SCANNER"] + assert len(scanner) == 1 + assert "MOD-DESIGN" in scanner[0].depends_on + assert len(scanner[0].capabilities) > 0 + + +class TestCsvParserTraceability: + def test_parse_traceability(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "traceability.csv") + assert "traceability_links" in result + links = result["traceability_links"] + assert len(links) > 0 + link = links[0] + assert link.trace_id.startswith("TR-") + assert isinstance(link.entity_ids, list) + assert isinstance(link.value_flow_ids, list) + + def test_traceability_space_split(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "traceability.csv") + links = result["traceability_links"] + # TR-04 has many entity_ids space-separated + tr04 = [l for l in links if l.trace_id == "TR-04"] + assert len(tr04) == 1 + assert len(tr04[0].entity_ids) > 5 + + +class TestCsvParserOtherTypes: + def test_parse_value_flows(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "business-architecture" / "03-value-flows.csv") + assert "value_flows" in result + assert len(result["value_flows"]) > 0 + + def test_parse_user_journeys(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "business-architecture" / "04-user-journeys.csv") + assert "user_journeys" in result + assert len(result["user_journeys"]) > 0 + + def test_parse_integrations(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "application-architecture" / "03-integrations.csv") + assert "integrations" in result + assert len(result["integrations"]) > 0 + + def test_parse_external_systems(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "application-architecture" / "01-external-systems.csv") + assert "external_systems" in result + assert len(result["external_systems"]) > 0 + + def test_parse_entities(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "data-architecture" / "01-entities.csv") + assert "entities" in result + assert len(result["entities"]) > 0 + + def test_parse_data_flows(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "data-architecture" / "02-data-flows.csv") + assert "data_flows" in result + assert len(result["data_flows"]) > 0 + + def test_parse_data_security(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "data-architecture" / "03-data-security.csv") + assert "data_securities" in result + assert len(result["data_securities"]) > 0 + + def test_parse_tech_selections(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "technology-architecture" / "00-technology-selection.csv") + assert "tech_selections" in result + assert len(result["tech_selections"]) > 0 + + def test_parse_runtime_components(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "technology-architecture" / "01-runtime-components.csv") + assert "runtime_components" in result + assert len(result["runtime_components"]) > 0 + + def test_parse_environments(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "technology-architecture" / "02-environments.csv") + assert "environments" in result + assert len(result["environments"]) > 0 + + def test_parse_change_log(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "change-log.csv") + assert "change_log_entries" in result + assert len(result["change_log_entries"]) > 0 + + def test_parse_shared_terminology(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "domains" / "_shared" / "01-shared-terminology.csv") + assert "shared_terms" in result + terms = result["shared_terms"] + assert len(terms) > 0 + # Check used_by_domains is a list (space-split) + assert isinstance(terms[0].used_by_domains, list) + assert len(terms[0].used_by_domains) > 0 + + def test_parse_ubiquitous_language(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "domains" / "design" / "02-ubiquitous-language.csv") + assert "ubiquitous_terms" in result + assert len(result["ubiquitous_terms"]) > 0 + + def test_parse_scenarios(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "domains" / "design" / "03-scenarios-and-flows.csv") + assert "scenarios" in result + assert len(result["scenarios"]) > 0 + + def test_parse_domain_modules(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "domains" / "design" / "04-domain-modules.csv") + assert "domain_modules" in result + assert len(result["domain_modules"]) > 0 + + def test_parse_domain_entities(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "domains" / "design" / "05-domain-entities.csv") + assert "domain_entities" in result + assert len(result["domain_entities"]) > 0 + + def test_parse_codebase_alignment(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "application-architecture" / "06-codebase-alignment.csv") + assert "codebase_alignments" in result + assert len(result["codebase_alignments"]) > 0 + + def test_parse_codebase_mapping(self, csv_parser): + result = csv_parser.parse(DESIGN_DIR / "domains" / "design" / "07-codebase-mapping.csv") + assert "codebase_alignments" in result + assert len(result["codebase_alignments"]) > 0 + + +class TestCsvParserUnknown: + def test_unknown_csv_returns_empty(self, csv_parser, tmp_path): + unknown = tmp_path / "unknown-file.csv" + unknown.write_text("col1,col2\nval1,val2\n") + result = csv_parser.parse(unknown) + assert result == {} + + def test_nonexistent_file_returns_empty(self, csv_parser): + result = csv_parser.parse(Path("/nonexistent/file.csv")) + assert result == {}