feat(scanner): add YAML and OpenAPI parsers
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
51c6ba97fc
commit
699e2ad919
|
|
@ -0,0 +1,62 @@
|
||||||
|
"""OpenAPI parser — extracts ApiContract entities from OpenAPI YAML specifications."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
from app.modules.design.domain.entities import ApiContract
|
||||||
|
|
||||||
|
|
||||||
|
class OpenapiParser:
|
||||||
|
"""Parse OpenAPI YAML file and return dict mapping entity type name to list of instances.
|
||||||
|
|
||||||
|
Returns {'api_contracts': [ApiContract, ...]}
|
||||||
|
"""
|
||||||
|
|
||||||
|
def parse(self, file_path: Path) -> dict[str, list[Any]]:
|
||||||
|
try:
|
||||||
|
with open(file_path, encoding="utf-8") as f:
|
||||||
|
spec = yaml.safe_load(f)
|
||||||
|
except Exception:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
if not isinstance(spec, dict) or "paths" not in spec:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
contracts: list[ApiContract] = []
|
||||||
|
paths = spec["paths"]
|
||||||
|
if not isinstance(paths, dict):
|
||||||
|
return {}
|
||||||
|
|
||||||
|
for path, path_item in paths.items():
|
||||||
|
if not isinstance(path_item, dict):
|
||||||
|
continue
|
||||||
|
for method, operation in path_item.items():
|
||||||
|
# Skip non-HTTP-method keys (e.g., 'parameters', 'summary')
|
||||||
|
if method.lower() not in (
|
||||||
|
"get", "post", "put", "delete", "patch", "options", "head", "trace",
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
if not isinstance(operation, dict):
|
||||||
|
continue
|
||||||
|
|
||||||
|
operation_id = operation.get("operationId", "")
|
||||||
|
summary = operation.get("summary", "")
|
||||||
|
|
||||||
|
doc_id = f"API-{operation_id or method.upper()}-{path}"
|
||||||
|
|
||||||
|
contracts.append(ApiContract(
|
||||||
|
doc_id=doc_id,
|
||||||
|
path=path,
|
||||||
|
method=method.upper(),
|
||||||
|
operation_id=operation_id or "",
|
||||||
|
summary=summary or "",
|
||||||
|
))
|
||||||
|
|
||||||
|
if not contracts:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
return {"api_contracts": contracts}
|
||||||
|
|
@ -0,0 +1,19 @@
|
||||||
|
"""YAML parser — simple wrapper around yaml.safe_load for configuration files."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
|
||||||
|
class YamlParser:
|
||||||
|
"""Load a YAML file and return its contents as a Python dict/list."""
|
||||||
|
|
||||||
|
def load(self, file_path: Path) -> Any:
|
||||||
|
try:
|
||||||
|
with open(file_path, encoding="utf-8") as f:
|
||||||
|
return yaml.safe_load(f)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
@ -299,3 +299,78 @@ class TestMdParserReleasePlan:
|
||||||
result = md_parser.parse(DESIGN_DIR / "technology-architecture" / "04-release-and-rollback.md")
|
result = md_parser.parse(DESIGN_DIR / "technology-architecture" / "04-release-and-rollback.md")
|
||||||
assert "design_documents" in result
|
assert "design_documents" in result
|
||||||
assert "release_plan" in result
|
assert "release_plan" in result
|
||||||
|
|
||||||
|
|
||||||
|
# ── YAML Parser Tests ──
|
||||||
|
|
||||||
|
from app.modules.scanner.infrastructure.parsers.yaml_parser import YamlParser
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def yaml_parser():
|
||||||
|
return YamlParser()
|
||||||
|
|
||||||
|
|
||||||
|
class TestYamlParser:
|
||||||
|
def test_load_openapi_yaml(self, yaml_parser):
|
||||||
|
data = yaml_parser.load(
|
||||||
|
DESIGN_DIR / "application-architecture" / "04-api-contracts.openapi.yaml"
|
||||||
|
)
|
||||||
|
assert data is not None
|
||||||
|
assert "openapi" in data
|
||||||
|
assert "paths" in data
|
||||||
|
|
||||||
|
def test_load_nonexistent_returns_none(self, yaml_parser):
|
||||||
|
result = yaml_parser.load(Path("/nonexistent/file.yaml"))
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_load_plain_yaml(self, yaml_parser, tmp_path):
|
||||||
|
f = tmp_path / "test.yaml"
|
||||||
|
f.write_text("key: value\nlist:\n - one\n - two\n")
|
||||||
|
data = yaml_parser.load(f)
|
||||||
|
assert data == {"key": "value", "list": ["one", "two"]}
|
||||||
|
|
||||||
|
|
||||||
|
# ── OpenAPI Parser Tests ──
|
||||||
|
|
||||||
|
from app.modules.scanner.infrastructure.parsers.openapi_parser import OpenapiParser
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def openapi_parser():
|
||||||
|
return OpenapiParser()
|
||||||
|
|
||||||
|
|
||||||
|
class TestOpenapiParser:
|
||||||
|
def test_parse_api_contracts(self, openapi_parser):
|
||||||
|
result = openapi_parser.parse(
|
||||||
|
DESIGN_DIR / "application-architecture" / "04-api-contracts.openapi.yaml"
|
||||||
|
)
|
||||||
|
assert "api_contracts" in result
|
||||||
|
contracts = result["api_contracts"]
|
||||||
|
assert len(contracts) > 0
|
||||||
|
# Check that contracts have correct fields
|
||||||
|
contract = contracts[0]
|
||||||
|
assert contract.path.startswith("/")
|
||||||
|
assert contract.method in ("GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD")
|
||||||
|
assert contract.doc_id.startswith("API-")
|
||||||
|
|
||||||
|
def test_parse_health_endpoint(self, openapi_parser):
|
||||||
|
result = openapi_parser.parse(
|
||||||
|
DESIGN_DIR / "application-architecture" / "04-api-contracts.openapi.yaml"
|
||||||
|
)
|
||||||
|
contracts = result["api_contracts"]
|
||||||
|
health = [c for c in contracts if c.path == "/api/health"]
|
||||||
|
assert len(health) == 1
|
||||||
|
assert health[0].method == "GET"
|
||||||
|
assert health[0].operation_id == "healthCheck"
|
||||||
|
|
||||||
|
def test_parse_nonexistent_returns_empty(self, openapi_parser):
|
||||||
|
result = openapi_parser.parse(Path("/nonexistent/file.yaml"))
|
||||||
|
assert result == {}
|
||||||
|
|
||||||
|
def test_parse_non_openapi_yaml_returns_empty(self, openapi_parser, tmp_path):
|
||||||
|
f = tmp_path / "not-openapi.yaml"
|
||||||
|
f.write_text("key: value\n")
|
||||||
|
result = openapi_parser.parse(f)
|
||||||
|
assert result == {}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user