feat(scanner): add Markdown parser — frontmatter extraction and specialized entity mapping
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
b64eb8aa06
commit
51c6ba97fc
|
|
@ -0,0 +1,160 @@
|
|||
"""Markdown parser — extracts YAML frontmatter and produces DesignDocument + specialized entities."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
|
||||
from app.modules.design.domain.entities import (
|
||||
ADR,
|
||||
DesignDocument,
|
||||
Domain,
|
||||
ModuleBoundaryRule,
|
||||
OperationalBaseline,
|
||||
ReleasePlan,
|
||||
RuntimeTopology,
|
||||
ScopeAndGoals,
|
||||
SolutionLayer,
|
||||
SystemContext,
|
||||
)
|
||||
|
||||
|
||||
_FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n", re.DOTALL)
|
||||
|
||||
|
||||
class MdParser:
|
||||
"""Parse Markdown file and return dict mapping entity type name to list of instances.
|
||||
|
||||
Keys: 'design_documents', 'scope_and_goals', 'system_context', etc.
|
||||
"""
|
||||
|
||||
def parse(self, file_path: Path) -> dict[str, list[Any]]:
|
||||
try:
|
||||
content = file_path.read_text(encoding="utf-8")
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
match = _FRONTMATTER_RE.match(content)
|
||||
if not match:
|
||||
return {}
|
||||
|
||||
try:
|
||||
frontmatter = yaml.safe_load(match.group(1))
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
if not isinstance(frontmatter, dict):
|
||||
return {}
|
||||
|
||||
doc_id = frontmatter.get("doc_id", "")
|
||||
if not doc_id:
|
||||
return {}
|
||||
|
||||
title = frontmatter.get("title", "")
|
||||
version = frontmatter.get("version", "")
|
||||
status = frontmatter.get("status", "")
|
||||
owners = frontmatter.get("owners", []) or []
|
||||
upstream = frontmatter.get("upstream", []) or []
|
||||
downstream = frontmatter.get("downstream", []) or []
|
||||
|
||||
# Ensure list types
|
||||
if not isinstance(owners, list):
|
||||
owners = [owners]
|
||||
if not isinstance(upstream, list):
|
||||
upstream = [upstream]
|
||||
if not isinstance(downstream, list):
|
||||
downstream = [downstream]
|
||||
|
||||
design_doc = DesignDocument(
|
||||
doc_id=doc_id,
|
||||
title=title,
|
||||
version=str(version),
|
||||
status=status,
|
||||
owners=owners,
|
||||
upstream=upstream,
|
||||
downstream=downstream,
|
||||
file_path=str(file_path),
|
||||
)
|
||||
|
||||
result: dict[str, list[Any]] = {"design_documents": [design_doc]}
|
||||
|
||||
# Body content after frontmatter
|
||||
body = content[match.end():].strip()
|
||||
fname = file_path.name.lower()
|
||||
fpath_str = str(file_path).lower()
|
||||
|
||||
# Specialized entity detection
|
||||
if "scope-and-goals" in fname or "scope_and_goals" in fname:
|
||||
result["scope_and_goals"] = [ScopeAndGoals(
|
||||
doc_id=doc_id,
|
||||
title=title,
|
||||
core_problem="",
|
||||
users="",
|
||||
constraints="",
|
||||
)]
|
||||
|
||||
elif "system-context" in fname or "system_context" in fname:
|
||||
result["system_context"] = [SystemContext(
|
||||
doc_id=doc_id,
|
||||
title=title,
|
||||
content=body,
|
||||
)]
|
||||
|
||||
elif "solution-layering" in fname or "solution_layering" in fname:
|
||||
result["solution_layer"] = [SolutionLayer(
|
||||
doc_id=doc_id,
|
||||
title=title,
|
||||
content=body,
|
||||
)]
|
||||
|
||||
elif "module-boundary" in fname or "module_boundary" in fname:
|
||||
result["module_boundary_rule"] = [ModuleBoundaryRule(
|
||||
doc_id=doc_id,
|
||||
title=title,
|
||||
content=body,
|
||||
)]
|
||||
|
||||
elif "runtime-topology" in fname or "runtime_topology" in fname:
|
||||
result["runtime_topology"] = [RuntimeTopology(
|
||||
doc_id=doc_id,
|
||||
title=title,
|
||||
content=body,
|
||||
)]
|
||||
|
||||
elif "operational-baseline" in fname or "operational_baseline" in fname:
|
||||
result["operational_baseline"] = [OperationalBaseline(
|
||||
doc_id=doc_id,
|
||||
title=title,
|
||||
content=body,
|
||||
)]
|
||||
|
||||
elif "release-and-rollback" in fname or "release_and_rollback" in fname:
|
||||
result["release_plan"] = [ReleasePlan(
|
||||
doc_id=doc_id,
|
||||
title=title,
|
||||
content=body,
|
||||
)]
|
||||
|
||||
elif "domain-overview" in fname or "domain_overview" in fname:
|
||||
# Extract domain name from parent directory
|
||||
domain_name = file_path.parent.name
|
||||
result["domains"] = [Domain(
|
||||
domain_name=domain_name,
|
||||
overview=body,
|
||||
modules=[],
|
||||
entities=[],
|
||||
)]
|
||||
|
||||
elif fname.startswith("adr-") and "template" not in fname.lower():
|
||||
result["adrs"] = [ADR(
|
||||
adr_id=doc_id,
|
||||
title=title,
|
||||
status=status,
|
||||
context=body,
|
||||
decision="",
|
||||
)]
|
||||
|
||||
return result
|
||||
|
|
@ -183,3 +183,119 @@ class TestCsvParserUnknown:
|
|||
def test_nonexistent_file_returns_empty(self, csv_parser):
|
||||
result = csv_parser.parse(Path("/nonexistent/file.csv"))
|
||||
assert result == {}
|
||||
|
||||
|
||||
# ── MD Parser Tests ──
|
||||
|
||||
from app.modules.scanner.infrastructure.parsers.md_parser import MdParser
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def md_parser():
|
||||
return MdParser()
|
||||
|
||||
|
||||
class TestMdParserScopeAndGoals:
|
||||
def test_parse_scope_and_goals(self, md_parser):
|
||||
result = md_parser.parse(DESIGN_DIR / "business-architecture" / "01-scope-and-goals.md")
|
||||
assert "design_documents" in result
|
||||
docs = result["design_documents"]
|
||||
assert len(docs) == 1
|
||||
assert docs[0].doc_id == "DOC-BA-001"
|
||||
assert docs[0].title == "范围与目标"
|
||||
assert isinstance(docs[0].owners, list)
|
||||
assert isinstance(docs[0].downstream, list)
|
||||
assert "scope_and_goals" in result
|
||||
sag = result["scope_and_goals"]
|
||||
assert len(sag) == 1
|
||||
assert sag[0].doc_id == "DOC-BA-001"
|
||||
|
||||
|
||||
class TestMdParserDomainOverview:
|
||||
def test_parse_domain_overview(self, md_parser):
|
||||
result = md_parser.parse(DESIGN_DIR / "domains" / "design" / "01-domain-overview.md")
|
||||
# domain-overview.md has no frontmatter in this repo, so it produces no DesignDocument
|
||||
# If it has no frontmatter, it returns empty
|
||||
# Check: this file does not have frontmatter
|
||||
content = (DESIGN_DIR / "domains" / "design" / "01-domain-overview.md").read_text()
|
||||
if content.startswith("---"):
|
||||
assert "design_documents" in result
|
||||
assert "domains" in result
|
||||
assert result["domains"][0].domain_name == "design"
|
||||
else:
|
||||
# No frontmatter, so empty result and Domain produced from filename
|
||||
assert result == {} or "domains" in result
|
||||
|
||||
|
||||
class TestMdParserSystemContext:
|
||||
def test_parse_system_context(self, md_parser):
|
||||
result = md_parser.parse(DESIGN_DIR / "application-architecture" / "01-system-context.md")
|
||||
assert "design_documents" in result
|
||||
assert "system_context" in result
|
||||
sc = result["system_context"]
|
||||
assert len(sc) == 1
|
||||
assert sc[0].doc_id == "DOC-AA-001"
|
||||
assert sc[0].title == "系统上下文"
|
||||
assert len(sc[0].content) > 0
|
||||
|
||||
|
||||
class TestMdParserAdrTemplate:
|
||||
def test_adr_template_no_adr_entity(self, md_parser):
|
||||
result = md_parser.parse(DESIGN_DIR / "adr" / "ADR-000-template.md")
|
||||
# ADR-000-template has no frontmatter, so empty
|
||||
content = (DESIGN_DIR / "adr" / "ADR-000-template.md").read_text()
|
||||
if not content.startswith("---"):
|
||||
assert result == {}
|
||||
else:
|
||||
# If it has frontmatter, should NOT produce ADR (it's a template)
|
||||
assert "adrs" not in result
|
||||
|
||||
|
||||
class TestMdParserNoFrontmatter:
|
||||
def test_no_frontmatter_returns_empty(self, md_parser, tmp_path):
|
||||
md = tmp_path / "test.md"
|
||||
md.write_text("# Just a heading\n\nSome content.\n")
|
||||
result = md_parser.parse(md)
|
||||
assert result == {}
|
||||
|
||||
def test_nonexistent_md_returns_empty(self, md_parser):
|
||||
result = md_parser.parse(Path("/nonexistent/file.md"))
|
||||
assert result == {}
|
||||
|
||||
|
||||
class TestMdParserSolutionLayering:
|
||||
def test_parse_solution_layering(self, md_parser):
|
||||
result = md_parser.parse(DESIGN_DIR / "application-architecture" / "02b-solution-layering.md")
|
||||
assert "design_documents" in result
|
||||
assert "solution_layer" in result
|
||||
sl = result["solution_layer"]
|
||||
assert len(sl) == 1
|
||||
assert sl[0].doc_id == "DOC-AA-003"
|
||||
|
||||
|
||||
class TestMdParserModuleBoundary:
|
||||
def test_parse_module_boundary(self, md_parser):
|
||||
result = md_parser.parse(DESIGN_DIR / "application-architecture" / "07-module-boundary-rules.md")
|
||||
assert "design_documents" in result
|
||||
assert "module_boundary_rule" in result
|
||||
|
||||
|
||||
class TestMdParserRuntimeTopology:
|
||||
def test_parse_runtime_topology(self, md_parser):
|
||||
result = md_parser.parse(DESIGN_DIR / "technology-architecture" / "01-runtime-topology.md")
|
||||
assert "design_documents" in result
|
||||
assert "runtime_topology" in result
|
||||
|
||||
|
||||
class TestMdParserOperationalBaseline:
|
||||
def test_parse_operational_baseline(self, md_parser):
|
||||
result = md_parser.parse(DESIGN_DIR / "technology-architecture" / "03-operational-baseline.md")
|
||||
assert "design_documents" in result
|
||||
assert "operational_baseline" in result
|
||||
|
||||
|
||||
class TestMdParserReleasePlan:
|
||||
def test_parse_release_plan(self, md_parser):
|
||||
result = md_parser.parse(DESIGN_DIR / "technology-architecture" / "04-release-and-rollback.md")
|
||||
assert "design_documents" in result
|
||||
assert "release_plan" in result
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user