arch-design-agent-skill-das.../backend/app/modules/graph/application/services.py
openclaw 4d70df76fc feat(graph): map node status from FileStatus via source_file (GAP-B1)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-24 08:19:40 +00:00

175 lines
6.9 KiB
Python

"""GraphService — builds a relationship graph from ScanResult entities."""
from __future__ import annotations
from app.modules.graph.domain.entities import GraphEdge, GraphGroup, GraphNode, GraphView
from app.modules.scanner.domain.entities import ScanResult
# Fixed set of groups
_GROUPS = [
GraphGroup(id="business", label="Business", layer="business"),
GraphGroup(id="application", label="Application", layer="application"),
GraphGroup(id="data", label="Data", layer="data"),
GraphGroup(id="technology", label="Technology", layer="technology"),
GraphGroup(id="cross-layer", label="Cross-Layer", layer="cross-layer"),
]
_SOURCE_FILES: dict[str, str] = {
"capability": "business-architecture/02-capability-map.csv",
"module": "application-architecture/02-modules.csv",
"entity": "data-architecture/01-entities.csv",
"runtime_component": "technology-architecture/01-runtime-components.csv",
}
class GraphService:
"""Constructs a panorama graph and supports neighbor queries."""
def build_panorama(self, scan_result: ScanResult, *, design_dir: str = "") -> GraphView:
"""Build a full panorama GraphView from a ScanResult (9-step algorithm)."""
nodes: list[GraphNode] = []
edges: list[GraphEdge] = []
node_ids: set[str] = set()
# Build file-status lookup from ScanResult
file_status_map: dict[str, str] = {
fs.path: fs.status.value for fs in scan_result.file_statuses
}
# Step 1: groups are always the fixed 5
groups = list(_GROUPS)
# Step 2: Capability → node(type="capability", group="business")
for cap in scan_result.capabilities:
node_id = cap.capability_id
nodes.append(GraphNode(
id=node_id,
type="capability",
label=cap.name,
status=file_status_map.get(_SOURCE_FILES["capability"], "unknown"),
group_id="business",
))
node_ids.add(node_id)
# Step 3: Module → node(type="module", group="application")
for mod in scan_result.modules:
node_id = mod.module_id
nodes.append(GraphNode(
id=node_id,
type="module",
label=mod.name,
status=file_status_map.get(_SOURCE_FILES["module"], "unknown"),
group_id="application",
))
node_ids.add(node_id)
# Step 4: Entity → node(type="entity", group="data")
for ent in scan_result.entities:
node_id = ent.entity_id
nodes.append(GraphNode(
id=node_id,
type="entity",
label=ent.name,
status=file_status_map.get(_SOURCE_FILES["entity"], "unknown"),
group_id="data",
))
node_ids.add(node_id)
# Step 5: RuntimeComponent → node(type="runtime_component", group="technology")
for rc in scan_result.runtime_components:
node_id = rc.component_id
nodes.append(GraphNode(
id=node_id,
type="runtime_component",
label=rc.name,
status=file_status_map.get(_SOURCE_FILES["runtime_component"], "unknown"),
group_id="technology",
))
node_ids.add(node_id)
# Step 6: TraceabilityLink → edges
for link in scan_result.traceability_links:
# capability_id → module_id
if link.capability_id in node_ids and link.module_id in node_ids:
edges.append(GraphEdge(
source=link.capability_id,
target=link.module_id,
relation="traces_to",
))
# module_id → each entity_id
for entity_id in link.entity_ids:
if link.module_id in node_ids and entity_id in node_ids:
edges.append(GraphEdge(
source=link.module_id,
target=entity_id,
relation="traces_to",
))
# Step 7: Integration → edges: source_id → target_id
for intg in scan_result.integrations:
if intg.source_id in node_ids and intg.target_id in node_ids:
edges.append(GraphEdge(
source=intg.source_id,
target=intg.target_id,
relation="integrates_with",
))
# Step 8: Module.depends_on → edges
for mod in scan_result.modules:
for dep_id in mod.depends_on:
if mod.module_id in node_ids and dep_id in node_ids:
edges.append(GraphEdge(
source=mod.module_id,
target=dep_id,
relation="depends_on",
))
# Step 9: DesignDocument.upstream/downstream → edges (if both are nodes)
for doc in scan_result.design_documents:
for upstream_id in doc.upstream:
if doc.doc_id in node_ids and upstream_id in node_ids:
edges.append(GraphEdge(
source=doc.doc_id,
target=upstream_id,
relation="documents",
))
for downstream_id in doc.downstream:
if doc.doc_id in node_ids and downstream_id in node_ids:
edges.append(GraphEdge(
source=doc.doc_id,
target=downstream_id,
relation="documents",
))
return GraphView(nodes=nodes, edges=edges, groups=groups)
def get_neighbors(self, graph_view: GraphView, node_id: str) -> GraphView:
"""Return a subgraph containing the given node and all its direct neighbors."""
# Check if node_id exists
node_exists = any(n.id == node_id for n in graph_view.nodes)
if not node_exists:
return GraphView(nodes=[], edges=[], groups=[])
# Find all edges where source==node_id or target==node_id
relevant_edges = [
e for e in graph_view.edges
if e.source == node_id or e.target == node_id
]
# Collect all neighbor node IDs from those edges + the target node itself
neighbor_ids: set[str] = {node_id}
for edge in relevant_edges:
neighbor_ids.add(edge.source)
neighbor_ids.add(edge.target)
# Filter nodes
relevant_nodes = [n for n in graph_view.nodes if n.id in neighbor_ids]
# Filter groups to only those referenced by relevant nodes
relevant_group_ids = {n.group_id for n in relevant_nodes}
relevant_groups = [g for g in graph_view.groups if g.id in relevant_group_ids]
return GraphView(nodes=relevant_nodes, edges=relevant_edges, groups=relevant_groups)