"""GraphService — builds a relationship graph from ScanResult entities.""" from __future__ import annotations from app.modules.graph.domain.entities import GraphEdge, GraphGroup, GraphNode, GraphView from app.modules.scanner.domain.entities import ScanResult # Fixed set of groups _GROUPS = [ GraphGroup(id="business", label="Business", layer="business"), GraphGroup(id="application", label="Application", layer="application"), GraphGroup(id="data", label="Data", layer="data"), GraphGroup(id="technology", label="Technology", layer="technology"), GraphGroup(id="cross-layer", label="Cross-Layer", layer="cross-layer"), ] class GraphService: """Constructs a panorama graph and supports neighbor queries.""" def build_panorama(self, scan_result: ScanResult) -> GraphView: """Build a full panorama GraphView from a ScanResult (9-step algorithm).""" nodes: list[GraphNode] = [] edges: list[GraphEdge] = [] node_ids: set[str] = set() # Step 1: groups are always the fixed 5 groups = list(_GROUPS) # Step 2: Capability → node(type="capability", group="business") for cap in scan_result.capabilities: node_id = cap.capability_id nodes.append(GraphNode( id=node_id, type="capability", label=cap.name, status="unknown", group_id="business", )) node_ids.add(node_id) # Step 3: Module → node(type="module", group="application") for mod in scan_result.modules: node_id = mod.module_id nodes.append(GraphNode( id=node_id, type="module", label=mod.name, status="unknown", group_id="application", )) node_ids.add(node_id) # Step 4: Entity → node(type="entity", group="data") for ent in scan_result.entities: node_id = ent.entity_id nodes.append(GraphNode( id=node_id, type="entity", label=ent.name, status="unknown", group_id="data", )) node_ids.add(node_id) # Step 5: RuntimeComponent → node(type="runtime_component", group="technology") for rc in scan_result.runtime_components: node_id = rc.component_id nodes.append(GraphNode( id=node_id, type="runtime_component", label=rc.name, status="unknown", group_id="technology", )) node_ids.add(node_id) # Step 6: TraceabilityLink → edges for link in scan_result.traceability_links: # capability_id → module_id if link.capability_id in node_ids and link.module_id in node_ids: edges.append(GraphEdge( source=link.capability_id, target=link.module_id, relation="traces_to", )) # module_id → each entity_id for entity_id in link.entity_ids: if link.module_id in node_ids and entity_id in node_ids: edges.append(GraphEdge( source=link.module_id, target=entity_id, relation="traces_to", )) # Step 7: Integration → edges: source_id → target_id for intg in scan_result.integrations: if intg.source_id in node_ids and intg.target_id in node_ids: edges.append(GraphEdge( source=intg.source_id, target=intg.target_id, relation="integrates_with", )) # Step 8: Module.depends_on → edges for mod in scan_result.modules: for dep_id in mod.depends_on: if mod.module_id in node_ids and dep_id in node_ids: edges.append(GraphEdge( source=mod.module_id, target=dep_id, relation="depends_on", )) # Step 9: DesignDocument.upstream/downstream → edges (if both are nodes) for doc in scan_result.design_documents: for upstream_id in doc.upstream: if doc.doc_id in node_ids and upstream_id in node_ids: edges.append(GraphEdge( source=doc.doc_id, target=upstream_id, relation="documents", )) for downstream_id in doc.downstream: if doc.doc_id in node_ids and downstream_id in node_ids: edges.append(GraphEdge( source=doc.doc_id, target=downstream_id, relation="documents", )) return GraphView(nodes=nodes, edges=edges, groups=groups) def get_neighbors(self, graph_view: GraphView, node_id: str) -> GraphView: """Return a subgraph containing the given node and all its direct neighbors.""" # Check if node_id exists node_exists = any(n.id == node_id for n in graph_view.nodes) if not node_exists: return GraphView(nodes=[], edges=[], groups=[]) # Find all edges where source==node_id or target==node_id relevant_edges = [ e for e in graph_view.edges if e.source == node_id or e.target == node_id ] # Collect all neighbor node IDs from those edges + the target node itself neighbor_ids: set[str] = {node_id} for edge in relevant_edges: neighbor_ids.add(edge.source) neighbor_ids.add(edge.target) # Filter nodes relevant_nodes = [n for n in graph_view.nodes if n.id in neighbor_ids] # Filter groups to only those referenced by relevant nodes relevant_group_ids = {n.group_id for n in relevant_nodes} relevant_groups = [g for g in graph_view.groups if g.id in relevant_group_ids] return GraphView(nodes=relevant_nodes, edges=relevant_edges, groups=relevant_groups)