From 4226ba8707a13baffb22d9ac1a8848ab5b4b0635 Mon Sep 17 00:00:00 2001 From: openclaw Date: Mon, 23 Mar 2026 16:49:50 +0000 Subject: [PATCH] =?UTF-8?q?feat(graph):=20add=20GraphService=20=E2=80=94?= =?UTF-8?q?=20panorama=20construction=20and=20neighbor=20query?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 --- .../app/modules/graph/application/services.py | 161 ++++++++++++++++++ backend/tests/test_graph_service.py | 82 +++++++++ 2 files changed, 243 insertions(+) create mode 100644 backend/tests/test_graph_service.py diff --git a/backend/app/modules/graph/application/services.py b/backend/app/modules/graph/application/services.py index e69de29..2a6a0cc 100644 --- a/backend/app/modules/graph/application/services.py +++ b/backend/app/modules/graph/application/services.py @@ -0,0 +1,161 @@ +"""GraphService — builds a relationship graph from ScanResult entities.""" + +from __future__ import annotations + +from app.modules.graph.domain.entities import GraphEdge, GraphGroup, GraphNode, GraphView +from app.modules.scanner.domain.entities import ScanResult + + +# Fixed set of groups +_GROUPS = [ + GraphGroup(id="business", label="Business", layer="business"), + GraphGroup(id="application", label="Application", layer="application"), + GraphGroup(id="data", label="Data", layer="data"), + GraphGroup(id="technology", label="Technology", layer="technology"), + GraphGroup(id="cross-layer", label="Cross-Layer", layer="cross-layer"), +] + + +class GraphService: + """Constructs a panorama graph and supports neighbor queries.""" + + def build_panorama(self, scan_result: ScanResult) -> GraphView: + """Build a full panorama GraphView from a ScanResult (9-step algorithm).""" + nodes: list[GraphNode] = [] + edges: list[GraphEdge] = [] + node_ids: set[str] = set() + + # Step 1: groups are always the fixed 5 + groups = list(_GROUPS) + + # Step 2: Capability → node(type="capability", group="business") + for cap in scan_result.capabilities: + node_id = cap.capability_id + nodes.append(GraphNode( + id=node_id, + type="capability", + label=cap.name, + status="unknown", + group_id="business", + )) + node_ids.add(node_id) + + # Step 3: Module → node(type="module", group="application") + for mod in scan_result.modules: + node_id = mod.module_id + nodes.append(GraphNode( + id=node_id, + type="module", + label=mod.name, + status="unknown", + group_id="application", + )) + node_ids.add(node_id) + + # Step 4: Entity → node(type="entity", group="data") + for ent in scan_result.entities: + node_id = ent.entity_id + nodes.append(GraphNode( + id=node_id, + type="entity", + label=ent.name, + status="unknown", + group_id="data", + )) + node_ids.add(node_id) + + # Step 5: RuntimeComponent → node(type="runtime_component", group="technology") + for rc in scan_result.runtime_components: + node_id = rc.component_id + nodes.append(GraphNode( + id=node_id, + type="runtime_component", + label=rc.name, + status="unknown", + group_id="technology", + )) + node_ids.add(node_id) + + # Step 6: TraceabilityLink → edges + for link in scan_result.traceability_links: + # capability_id → module_id + if link.capability_id in node_ids and link.module_id in node_ids: + edges.append(GraphEdge( + source=link.capability_id, + target=link.module_id, + relation="traces_to", + )) + # module_id → each entity_id + for entity_id in link.entity_ids: + if link.module_id in node_ids and entity_id in node_ids: + edges.append(GraphEdge( + source=link.module_id, + target=entity_id, + relation="traces_to", + )) + + # Step 7: Integration → edges: source_id → target_id + for intg in scan_result.integrations: + if intg.source_id in node_ids and intg.target_id in node_ids: + edges.append(GraphEdge( + source=intg.source_id, + target=intg.target_id, + relation="integrates_with", + )) + + # Step 8: Module.depends_on → edges + for mod in scan_result.modules: + for dep_id in mod.depends_on: + if mod.module_id in node_ids and dep_id in node_ids: + edges.append(GraphEdge( + source=mod.module_id, + target=dep_id, + relation="depends_on", + )) + + # Step 9: DesignDocument.upstream/downstream → edges (if both are nodes) + for doc in scan_result.design_documents: + for upstream_id in doc.upstream: + if doc.doc_id in node_ids and upstream_id in node_ids: + edges.append(GraphEdge( + source=doc.doc_id, + target=upstream_id, + relation="documents", + )) + for downstream_id in doc.downstream: + if doc.doc_id in node_ids and downstream_id in node_ids: + edges.append(GraphEdge( + source=doc.doc_id, + target=downstream_id, + relation="documents", + )) + + return GraphView(nodes=nodes, edges=edges, groups=groups) + + def get_neighbors(self, graph_view: GraphView, node_id: str) -> GraphView: + """Return a subgraph containing the given node and all its direct neighbors.""" + # Check if node_id exists + node_exists = any(n.id == node_id for n in graph_view.nodes) + if not node_exists: + return GraphView(nodes=[], edges=[], groups=[]) + + # Find all edges where source==node_id or target==node_id + relevant_edges = [ + e for e in graph_view.edges + if e.source == node_id or e.target == node_id + ] + + # Collect all neighbor node IDs from those edges + the target node itself + neighbor_ids: set[str] = {node_id} + for edge in relevant_edges: + neighbor_ids.add(edge.source) + neighbor_ids.add(edge.target) + + # Filter nodes + relevant_nodes = [n for n in graph_view.nodes if n.id in neighbor_ids] + + # Filter groups to only those referenced by relevant nodes + relevant_group_ids = {n.group_id for n in relevant_nodes} + relevant_groups = [g for g in graph_view.groups if g.id in relevant_group_ids] + + return GraphView(nodes=relevant_nodes, edges=relevant_edges, groups=relevant_groups) diff --git a/backend/tests/test_graph_service.py b/backend/tests/test_graph_service.py new file mode 100644 index 0000000..7a0a5ca --- /dev/null +++ b/backend/tests/test_graph_service.py @@ -0,0 +1,82 @@ +import pytest +from datetime import datetime +from pathlib import Path +from app.modules.project.domain.entities import Project +from app.modules.scanner.application.services import ScanService +from app.modules.graph.application.services import GraphService + + +@pytest.fixture +def scan_result(): + svc = ScanService() + project = Project( + id="test", name="test", + design_dir="/workspace/arch-design-agent-skill-dashboard/design", + code_dir=None, created_at=datetime(2026, 1, 1), + ) + return svc.scan(project) + + +@pytest.fixture +def graph_service(): + return GraphService() + + +def test_panorama_has_groups(graph_service, scan_result): + view = graph_service.build_panorama(scan_result) + group_ids = {g.id for g in view.groups} + assert "business" in group_ids + assert "application" in group_ids + assert "data" in group_ids + assert "technology" in group_ids + assert "cross-layer" in group_ids + + +def test_panorama_has_capability_nodes(graph_service, scan_result): + view = graph_service.build_panorama(scan_result) + cap_nodes = [n for n in view.nodes if n.type == "capability"] + assert len(cap_nodes) > 0 + assert all(n.group_id == "business" for n in cap_nodes) + + +def test_panorama_has_module_nodes(graph_service, scan_result): + view = graph_service.build_panorama(scan_result) + mod_nodes = [n for n in view.nodes if n.type == "module"] + assert len(mod_nodes) > 0 + assert all(n.group_id == "application" for n in mod_nodes) + + +def test_panorama_has_entity_nodes(graph_service, scan_result): + view = graph_service.build_panorama(scan_result) + ent_nodes = [n for n in view.nodes if n.type == "entity"] + assert len(ent_nodes) > 0 + assert all(n.group_id == "data" for n in ent_nodes) + + +def test_panorama_has_edges(graph_service, scan_result): + view = graph_service.build_panorama(scan_result) + assert len(view.edges) > 0 + relations = {e.relation for e in view.edges} + assert "traces_to" in relations + + +def test_panorama_depends_on_edges(graph_service, scan_result): + view = graph_service.build_panorama(scan_result) + dep_edges = [e for e in view.edges if e.relation == "depends_on"] + assert len(dep_edges) > 0 + + +def test_neighbors_returns_subgraph(graph_service, scan_result): + view = graph_service.build_panorama(scan_result) + # Use a known capability node + neighbors = graph_service.get_neighbors(view, "CAP-PROJ-REG") + assert len(neighbors.nodes) > 0 + assert any(n.id == "CAP-PROJ-REG" for n in neighbors.nodes) + assert len(neighbors.edges) > 0 + + +def test_neighbors_unknown_node(graph_service, scan_result): + view = graph_service.build_panorama(scan_result) + neighbors = graph_service.get_neighbors(view, "NONEXISTENT") + assert len(neighbors.nodes) == 0 + assert len(neighbors.edges) == 0