feat(graph): add GraphService — panorama construction and neighbor query
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
aa892ede19
commit
4226ba8707
|
|
@ -0,0 +1,161 @@
|
||||||
|
"""GraphService — builds a relationship graph from ScanResult entities."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from app.modules.graph.domain.entities import GraphEdge, GraphGroup, GraphNode, GraphView
|
||||||
|
from app.modules.scanner.domain.entities import ScanResult
|
||||||
|
|
||||||
|
|
||||||
|
# Fixed set of groups
|
||||||
|
_GROUPS = [
|
||||||
|
GraphGroup(id="business", label="Business", layer="business"),
|
||||||
|
GraphGroup(id="application", label="Application", layer="application"),
|
||||||
|
GraphGroup(id="data", label="Data", layer="data"),
|
||||||
|
GraphGroup(id="technology", label="Technology", layer="technology"),
|
||||||
|
GraphGroup(id="cross-layer", label="Cross-Layer", layer="cross-layer"),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class GraphService:
|
||||||
|
"""Constructs a panorama graph and supports neighbor queries."""
|
||||||
|
|
||||||
|
def build_panorama(self, scan_result: ScanResult) -> GraphView:
|
||||||
|
"""Build a full panorama GraphView from a ScanResult (9-step algorithm)."""
|
||||||
|
nodes: list[GraphNode] = []
|
||||||
|
edges: list[GraphEdge] = []
|
||||||
|
node_ids: set[str] = set()
|
||||||
|
|
||||||
|
# Step 1: groups are always the fixed 5
|
||||||
|
groups = list(_GROUPS)
|
||||||
|
|
||||||
|
# Step 2: Capability → node(type="capability", group="business")
|
||||||
|
for cap in scan_result.capabilities:
|
||||||
|
node_id = cap.capability_id
|
||||||
|
nodes.append(GraphNode(
|
||||||
|
id=node_id,
|
||||||
|
type="capability",
|
||||||
|
label=cap.name,
|
||||||
|
status="unknown",
|
||||||
|
group_id="business",
|
||||||
|
))
|
||||||
|
node_ids.add(node_id)
|
||||||
|
|
||||||
|
# Step 3: Module → node(type="module", group="application")
|
||||||
|
for mod in scan_result.modules:
|
||||||
|
node_id = mod.module_id
|
||||||
|
nodes.append(GraphNode(
|
||||||
|
id=node_id,
|
||||||
|
type="module",
|
||||||
|
label=mod.name,
|
||||||
|
status="unknown",
|
||||||
|
group_id="application",
|
||||||
|
))
|
||||||
|
node_ids.add(node_id)
|
||||||
|
|
||||||
|
# Step 4: Entity → node(type="entity", group="data")
|
||||||
|
for ent in scan_result.entities:
|
||||||
|
node_id = ent.entity_id
|
||||||
|
nodes.append(GraphNode(
|
||||||
|
id=node_id,
|
||||||
|
type="entity",
|
||||||
|
label=ent.name,
|
||||||
|
status="unknown",
|
||||||
|
group_id="data",
|
||||||
|
))
|
||||||
|
node_ids.add(node_id)
|
||||||
|
|
||||||
|
# Step 5: RuntimeComponent → node(type="runtime_component", group="technology")
|
||||||
|
for rc in scan_result.runtime_components:
|
||||||
|
node_id = rc.component_id
|
||||||
|
nodes.append(GraphNode(
|
||||||
|
id=node_id,
|
||||||
|
type="runtime_component",
|
||||||
|
label=rc.name,
|
||||||
|
status="unknown",
|
||||||
|
group_id="technology",
|
||||||
|
))
|
||||||
|
node_ids.add(node_id)
|
||||||
|
|
||||||
|
# Step 6: TraceabilityLink → edges
|
||||||
|
for link in scan_result.traceability_links:
|
||||||
|
# capability_id → module_id
|
||||||
|
if link.capability_id in node_ids and link.module_id in node_ids:
|
||||||
|
edges.append(GraphEdge(
|
||||||
|
source=link.capability_id,
|
||||||
|
target=link.module_id,
|
||||||
|
relation="traces_to",
|
||||||
|
))
|
||||||
|
# module_id → each entity_id
|
||||||
|
for entity_id in link.entity_ids:
|
||||||
|
if link.module_id in node_ids and entity_id in node_ids:
|
||||||
|
edges.append(GraphEdge(
|
||||||
|
source=link.module_id,
|
||||||
|
target=entity_id,
|
||||||
|
relation="traces_to",
|
||||||
|
))
|
||||||
|
|
||||||
|
# Step 7: Integration → edges: source_id → target_id
|
||||||
|
for intg in scan_result.integrations:
|
||||||
|
if intg.source_id in node_ids and intg.target_id in node_ids:
|
||||||
|
edges.append(GraphEdge(
|
||||||
|
source=intg.source_id,
|
||||||
|
target=intg.target_id,
|
||||||
|
relation="integrates_with",
|
||||||
|
))
|
||||||
|
|
||||||
|
# Step 8: Module.depends_on → edges
|
||||||
|
for mod in scan_result.modules:
|
||||||
|
for dep_id in mod.depends_on:
|
||||||
|
if mod.module_id in node_ids and dep_id in node_ids:
|
||||||
|
edges.append(GraphEdge(
|
||||||
|
source=mod.module_id,
|
||||||
|
target=dep_id,
|
||||||
|
relation="depends_on",
|
||||||
|
))
|
||||||
|
|
||||||
|
# Step 9: DesignDocument.upstream/downstream → edges (if both are nodes)
|
||||||
|
for doc in scan_result.design_documents:
|
||||||
|
for upstream_id in doc.upstream:
|
||||||
|
if doc.doc_id in node_ids and upstream_id in node_ids:
|
||||||
|
edges.append(GraphEdge(
|
||||||
|
source=doc.doc_id,
|
||||||
|
target=upstream_id,
|
||||||
|
relation="documents",
|
||||||
|
))
|
||||||
|
for downstream_id in doc.downstream:
|
||||||
|
if doc.doc_id in node_ids and downstream_id in node_ids:
|
||||||
|
edges.append(GraphEdge(
|
||||||
|
source=doc.doc_id,
|
||||||
|
target=downstream_id,
|
||||||
|
relation="documents",
|
||||||
|
))
|
||||||
|
|
||||||
|
return GraphView(nodes=nodes, edges=edges, groups=groups)
|
||||||
|
|
||||||
|
def get_neighbors(self, graph_view: GraphView, node_id: str) -> GraphView:
|
||||||
|
"""Return a subgraph containing the given node and all its direct neighbors."""
|
||||||
|
# Check if node_id exists
|
||||||
|
node_exists = any(n.id == node_id for n in graph_view.nodes)
|
||||||
|
if not node_exists:
|
||||||
|
return GraphView(nodes=[], edges=[], groups=[])
|
||||||
|
|
||||||
|
# Find all edges where source==node_id or target==node_id
|
||||||
|
relevant_edges = [
|
||||||
|
e for e in graph_view.edges
|
||||||
|
if e.source == node_id or e.target == node_id
|
||||||
|
]
|
||||||
|
|
||||||
|
# Collect all neighbor node IDs from those edges + the target node itself
|
||||||
|
neighbor_ids: set[str] = {node_id}
|
||||||
|
for edge in relevant_edges:
|
||||||
|
neighbor_ids.add(edge.source)
|
||||||
|
neighbor_ids.add(edge.target)
|
||||||
|
|
||||||
|
# Filter nodes
|
||||||
|
relevant_nodes = [n for n in graph_view.nodes if n.id in neighbor_ids]
|
||||||
|
|
||||||
|
# Filter groups to only those referenced by relevant nodes
|
||||||
|
relevant_group_ids = {n.group_id for n in relevant_nodes}
|
||||||
|
relevant_groups = [g for g in graph_view.groups if g.id in relevant_group_ids]
|
||||||
|
|
||||||
|
return GraphView(nodes=relevant_nodes, edges=relevant_edges, groups=relevant_groups)
|
||||||
82
backend/tests/test_graph_service.py
Normal file
82
backend/tests/test_graph_service.py
Normal file
|
|
@ -0,0 +1,82 @@
|
||||||
|
import pytest
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from app.modules.project.domain.entities import Project
|
||||||
|
from app.modules.scanner.application.services import ScanService
|
||||||
|
from app.modules.graph.application.services import GraphService
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def scan_result():
|
||||||
|
svc = ScanService()
|
||||||
|
project = Project(
|
||||||
|
id="test", name="test",
|
||||||
|
design_dir="/workspace/arch-design-agent-skill-dashboard/design",
|
||||||
|
code_dir=None, created_at=datetime(2026, 1, 1),
|
||||||
|
)
|
||||||
|
return svc.scan(project)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def graph_service():
|
||||||
|
return GraphService()
|
||||||
|
|
||||||
|
|
||||||
|
def test_panorama_has_groups(graph_service, scan_result):
|
||||||
|
view = graph_service.build_panorama(scan_result)
|
||||||
|
group_ids = {g.id for g in view.groups}
|
||||||
|
assert "business" in group_ids
|
||||||
|
assert "application" in group_ids
|
||||||
|
assert "data" in group_ids
|
||||||
|
assert "technology" in group_ids
|
||||||
|
assert "cross-layer" in group_ids
|
||||||
|
|
||||||
|
|
||||||
|
def test_panorama_has_capability_nodes(graph_service, scan_result):
|
||||||
|
view = graph_service.build_panorama(scan_result)
|
||||||
|
cap_nodes = [n for n in view.nodes if n.type == "capability"]
|
||||||
|
assert len(cap_nodes) > 0
|
||||||
|
assert all(n.group_id == "business" for n in cap_nodes)
|
||||||
|
|
||||||
|
|
||||||
|
def test_panorama_has_module_nodes(graph_service, scan_result):
|
||||||
|
view = graph_service.build_panorama(scan_result)
|
||||||
|
mod_nodes = [n for n in view.nodes if n.type == "module"]
|
||||||
|
assert len(mod_nodes) > 0
|
||||||
|
assert all(n.group_id == "application" for n in mod_nodes)
|
||||||
|
|
||||||
|
|
||||||
|
def test_panorama_has_entity_nodes(graph_service, scan_result):
|
||||||
|
view = graph_service.build_panorama(scan_result)
|
||||||
|
ent_nodes = [n for n in view.nodes if n.type == "entity"]
|
||||||
|
assert len(ent_nodes) > 0
|
||||||
|
assert all(n.group_id == "data" for n in ent_nodes)
|
||||||
|
|
||||||
|
|
||||||
|
def test_panorama_has_edges(graph_service, scan_result):
|
||||||
|
view = graph_service.build_panorama(scan_result)
|
||||||
|
assert len(view.edges) > 0
|
||||||
|
relations = {e.relation for e in view.edges}
|
||||||
|
assert "traces_to" in relations
|
||||||
|
|
||||||
|
|
||||||
|
def test_panorama_depends_on_edges(graph_service, scan_result):
|
||||||
|
view = graph_service.build_panorama(scan_result)
|
||||||
|
dep_edges = [e for e in view.edges if e.relation == "depends_on"]
|
||||||
|
assert len(dep_edges) > 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_neighbors_returns_subgraph(graph_service, scan_result):
|
||||||
|
view = graph_service.build_panorama(scan_result)
|
||||||
|
# Use a known capability node
|
||||||
|
neighbors = graph_service.get_neighbors(view, "CAP-PROJ-REG")
|
||||||
|
assert len(neighbors.nodes) > 0
|
||||||
|
assert any(n.id == "CAP-PROJ-REG" for n in neighbors.nodes)
|
||||||
|
assert len(neighbors.edges) > 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_neighbors_unknown_node(graph_service, scan_result):
|
||||||
|
view = graph_service.build_panorama(scan_result)
|
||||||
|
neighbors = graph_service.get_neighbors(view, "NONEXISTENT")
|
||||||
|
assert len(neighbors.nodes) == 0
|
||||||
|
assert len(neighbors.edges) == 0
|
||||||
Loading…
Reference in New Issue
Block a user