- Create document nodes from scan_result.design_documents with type="document" and group_id="cross-layer" - Set parent field on entity nodes (capability/module/entity/runtime_component) pointing to their containing document via directory-based lookup - Replace dead-code Step 9 with path-based resolution of downstream refs and edge deduplication - Add helper functions _to_rel_path and _resolve_ref_path for absolute→relative path conversion and ../ resolution Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
238 lines
9.4 KiB
Python
238 lines
9.4 KiB
Python
"""GraphService — builds a relationship graph from ScanResult entities."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import PurePosixPath
|
|
|
|
from app.modules.graph.domain.entities import GraphEdge, GraphGroup, GraphNode, GraphView
|
|
from app.modules.scanner.domain.entities import ScanResult
|
|
|
|
|
|
def _to_rel_path(doc_file_path: str, design_dir: str) -> str:
|
|
"""Convert absolute doc.file_path to design-dir-relative path."""
|
|
try:
|
|
return str(PurePosixPath(doc_file_path).relative_to(design_dir))
|
|
except ValueError:
|
|
return doc_file_path
|
|
|
|
|
|
def _resolve_ref_path(ref_path: str, doc_rel_path: str) -> str:
|
|
"""Resolve a relative upstream/downstream ref against the doc's directory."""
|
|
doc_dir = str(PurePosixPath(doc_rel_path).parent)
|
|
resolved = str(PurePosixPath(doc_dir) / ref_path)
|
|
parts: list[str] = []
|
|
for part in PurePosixPath(resolved).parts:
|
|
if part == '..':
|
|
if parts:
|
|
parts.pop()
|
|
else:
|
|
parts.append(part)
|
|
return str(PurePosixPath(*parts)) if parts else ""
|
|
|
|
|
|
# Fixed set of groups
|
|
_GROUPS = [
|
|
GraphGroup(id="business", label="Business", layer="business"),
|
|
GraphGroup(id="application", label="Application", layer="application"),
|
|
GraphGroup(id="data", label="Data", layer="data"),
|
|
GraphGroup(id="technology", label="Technology", layer="technology"),
|
|
GraphGroup(id="cross-layer", label="Cross-Layer", layer="cross-layer"),
|
|
]
|
|
|
|
|
|
_SOURCE_FILES: dict[str, str] = {
|
|
"capability": "business-architecture/02-capability-map.csv",
|
|
"module": "application-architecture/02-modules.csv",
|
|
"entity": "data-architecture/01-entities.csv",
|
|
"runtime_component": "technology-architecture/01-runtime-components.csv",
|
|
}
|
|
|
|
|
|
class GraphService:
|
|
"""Constructs a panorama graph and supports neighbor queries."""
|
|
|
|
def build_panorama(self, scan_result: ScanResult, *, design_dir: str = "") -> GraphView:
|
|
"""Build a full panorama GraphView from a ScanResult (9-step algorithm)."""
|
|
nodes: list[GraphNode] = []
|
|
edges: list[GraphEdge] = []
|
|
node_ids: set[str] = set()
|
|
|
|
# Build file-status lookup from ScanResult
|
|
file_status_map: dict[str, str] = {
|
|
fs.path: fs.status.value for fs in scan_result.file_statuses
|
|
}
|
|
|
|
# Step 1: groups are always the fixed 5
|
|
groups = list(_GROUPS)
|
|
|
|
# Step 1.5: Build document nodes FIRST (needed for parent refs in Steps 2-5)
|
|
file_to_doc: dict[str, str] = {}
|
|
dir_to_doc: dict[str, str] = {}
|
|
for doc in scan_result.design_documents:
|
|
doc_rel = _to_rel_path(doc.file_path, design_dir)
|
|
file_to_doc[doc_rel] = doc.doc_id
|
|
# Map directory to first doc found there (for parent lookups by CSV path)
|
|
doc_dir = str(PurePosixPath(doc_rel).parent)
|
|
if doc_dir not in dir_to_doc:
|
|
dir_to_doc[doc_dir] = doc.doc_id
|
|
nodes.append(GraphNode(
|
|
id=doc.doc_id,
|
|
type="document",
|
|
label=doc.title or doc.doc_id,
|
|
status=file_status_map.get(doc_rel, "unknown"),
|
|
group_id="cross-layer",
|
|
))
|
|
node_ids.add(doc.doc_id)
|
|
|
|
def _parent_for(entity_type: str) -> str | None:
|
|
"""Find parent doc for an entity type via its source CSV directory."""
|
|
csv_path = _SOURCE_FILES.get(entity_type)
|
|
if not csv_path:
|
|
return None
|
|
return file_to_doc.get(csv_path) or dir_to_doc.get(
|
|
str(PurePosixPath(csv_path).parent)
|
|
)
|
|
|
|
# Step 2: Capability → node(type="capability", group="business")
|
|
for cap in scan_result.capabilities:
|
|
node_id = cap.capability_id
|
|
nodes.append(GraphNode(
|
|
id=node_id,
|
|
type="capability",
|
|
label=cap.name,
|
|
status=file_status_map.get(_SOURCE_FILES["capability"], "unknown"),
|
|
group_id="business",
|
|
parent=_parent_for("capability"),
|
|
))
|
|
node_ids.add(node_id)
|
|
|
|
# Step 3: Module → node(type="module", group="application")
|
|
for mod in scan_result.modules:
|
|
node_id = mod.module_id
|
|
nodes.append(GraphNode(
|
|
id=node_id,
|
|
type="module",
|
|
label=mod.name,
|
|
status=file_status_map.get(_SOURCE_FILES["module"], "unknown"),
|
|
group_id="application",
|
|
parent=_parent_for("module"),
|
|
))
|
|
node_ids.add(node_id)
|
|
|
|
# Step 4: Entity → node(type="entity", group="data")
|
|
for ent in scan_result.entities:
|
|
node_id = ent.entity_id
|
|
nodes.append(GraphNode(
|
|
id=node_id,
|
|
type="entity",
|
|
label=ent.name,
|
|
status=file_status_map.get(_SOURCE_FILES["entity"], "unknown"),
|
|
group_id="data",
|
|
parent=_parent_for("entity"),
|
|
))
|
|
node_ids.add(node_id)
|
|
|
|
# Step 5: RuntimeComponent → node(type="runtime_component", group="technology")
|
|
for rc in scan_result.runtime_components:
|
|
node_id = rc.component_id
|
|
nodes.append(GraphNode(
|
|
id=node_id,
|
|
type="runtime_component",
|
|
label=rc.name,
|
|
status=file_status_map.get(_SOURCE_FILES["runtime_component"], "unknown"),
|
|
group_id="technology",
|
|
parent=_parent_for("runtime_component"),
|
|
))
|
|
node_ids.add(node_id)
|
|
|
|
# Step 6: TraceabilityLink → edges
|
|
for link in scan_result.traceability_links:
|
|
# capability_id → module_id
|
|
if link.capability_id in node_ids and link.module_id in node_ids:
|
|
edges.append(GraphEdge(
|
|
source=link.capability_id,
|
|
target=link.module_id,
|
|
relation="traces_to",
|
|
))
|
|
# module_id → each entity_id
|
|
for entity_id in link.entity_ids:
|
|
if link.module_id in node_ids and entity_id in node_ids:
|
|
edges.append(GraphEdge(
|
|
source=link.module_id,
|
|
target=entity_id,
|
|
relation="traces_to",
|
|
))
|
|
|
|
# Step 7: Integration → edges: source_id → target_id
|
|
for intg in scan_result.integrations:
|
|
if intg.source_id in node_ids and intg.target_id in node_ids:
|
|
edges.append(GraphEdge(
|
|
source=intg.source_id,
|
|
target=intg.target_id,
|
|
relation="integrates_with",
|
|
))
|
|
|
|
# Step 8: Module.depends_on → edges
|
|
for mod in scan_result.modules:
|
|
for dep_id in mod.depends_on:
|
|
if mod.module_id in node_ids and dep_id in node_ids:
|
|
edges.append(GraphEdge(
|
|
source=mod.module_id,
|
|
target=dep_id,
|
|
relation="depends_on",
|
|
))
|
|
|
|
# Step 9: DesignDocument.downstream → doc-to-doc edges (deduplicated)
|
|
path_to_doc: dict[str, str] = {}
|
|
doc_rel_paths: dict[str, str] = {}
|
|
for doc in scan_result.design_documents:
|
|
doc_rel = _to_rel_path(doc.file_path, design_dir)
|
|
path_to_doc[doc_rel] = doc.doc_id
|
|
doc_rel_paths[doc.doc_id] = doc_rel
|
|
|
|
seen_edges: set[tuple[str, str]] = set()
|
|
for doc in scan_result.design_documents:
|
|
doc_rel = doc_rel_paths[doc.doc_id]
|
|
for down_path in doc.downstream:
|
|
resolved = _resolve_ref_path(down_path, doc_rel)
|
|
down_doc_id = path_to_doc.get(resolved)
|
|
if down_doc_id and down_doc_id in node_ids:
|
|
edge_key = (doc.doc_id, down_doc_id)
|
|
if edge_key not in seen_edges:
|
|
seen_edges.add(edge_key)
|
|
edges.append(GraphEdge(
|
|
source=doc.doc_id,
|
|
target=down_doc_id,
|
|
relation="documents",
|
|
))
|
|
|
|
return GraphView(nodes=nodes, edges=edges, groups=groups)
|
|
|
|
def get_neighbors(self, graph_view: GraphView, node_id: str) -> GraphView:
|
|
"""Return a subgraph containing the given node and all its direct neighbors."""
|
|
# Check if node_id exists
|
|
node_exists = any(n.id == node_id for n in graph_view.nodes)
|
|
if not node_exists:
|
|
return GraphView(nodes=[], edges=[], groups=[])
|
|
|
|
# Find all edges where source==node_id or target==node_id
|
|
relevant_edges = [
|
|
e for e in graph_view.edges
|
|
if e.source == node_id or e.target == node_id
|
|
]
|
|
|
|
# Collect all neighbor node IDs from those edges + the target node itself
|
|
neighbor_ids: set[str] = {node_id}
|
|
for edge in relevant_edges:
|
|
neighbor_ids.add(edge.source)
|
|
neighbor_ids.add(edge.target)
|
|
|
|
# Filter nodes
|
|
relevant_nodes = [n for n in graph_view.nodes if n.id in neighbor_ids]
|
|
|
|
# Filter groups to only those referenced by relevant nodes
|
|
relevant_group_ids = {n.group_id for n in relevant_nodes}
|
|
relevant_groups = [g for g in graph_view.groups if g.id in relevant_group_ids]
|
|
|
|
return GraphView(nodes=relevant_nodes, edges=relevant_edges, groups=relevant_groups)
|