"""GraphService — builds a relationship graph from ScanResult entities.""" from __future__ import annotations from pathlib import PurePosixPath from app.modules.graph.domain.entities import GraphEdge, GraphGroup, GraphNode, GraphView from app.modules.scanner.domain.entities import ScanResult def _to_rel_path(doc_file_path: str, design_dir: str) -> str: """Convert absolute doc.file_path to design-dir-relative path.""" try: return str(PurePosixPath(doc_file_path).relative_to(design_dir)) except ValueError: return doc_file_path def _resolve_ref_path(ref_path: str, doc_rel_path: str) -> str: """Resolve a relative upstream/downstream ref against the doc's directory.""" doc_dir = str(PurePosixPath(doc_rel_path).parent) resolved = str(PurePosixPath(doc_dir) / ref_path) parts: list[str] = [] for part in PurePosixPath(resolved).parts: if part == '..': if parts: parts.pop() else: parts.append(part) return str(PurePosixPath(*parts)) if parts else "" # Fixed set of groups _GROUPS = [ GraphGroup(id="business", label="Business", layer="business"), GraphGroup(id="application", label="Application", layer="application"), GraphGroup(id="data", label="Data", layer="data"), GraphGroup(id="technology", label="Technology", layer="technology"), GraphGroup(id="cross-layer", label="Cross-Layer", layer="cross-layer"), ] _SOURCE_FILES: dict[str, str] = { "capability": "business-architecture/02-capability-map.csv", "module": "application-architecture/02-modules.csv", "entity": "data-architecture/01-entities.csv", "runtime_component": "technology-architecture/01-runtime-components.csv", } class GraphService: """Constructs a panorama graph and supports neighbor queries.""" def build_panorama(self, scan_result: ScanResult, *, design_dir: str = "") -> GraphView: """Build a full panorama GraphView from a ScanResult (9-step algorithm).""" nodes: list[GraphNode] = [] edges: list[GraphEdge] = [] node_ids: set[str] = set() # Build file-status lookup from ScanResult file_status_map: dict[str, str] = { fs.path: fs.status.value for fs in scan_result.file_statuses } # Step 1: groups are always the fixed 5 groups = list(_GROUPS) # Step 1.5: Build document nodes FIRST (needed for parent refs in Steps 2-5) file_to_doc: dict[str, str] = {} dir_to_doc: dict[str, str] = {} for doc in scan_result.design_documents: doc_rel = _to_rel_path(doc.file_path, design_dir) file_to_doc[doc_rel] = doc.doc_id # Map directory to first doc found there (for parent lookups by CSV path) doc_dir = str(PurePosixPath(doc_rel).parent) if doc_dir not in dir_to_doc: dir_to_doc[doc_dir] = doc.doc_id nodes.append(GraphNode( id=doc.doc_id, type="document", label=doc.title or doc.doc_id, status=file_status_map.get(doc_rel, "unknown"), group_id="cross-layer", )) node_ids.add(doc.doc_id) def _parent_for(entity_type: str) -> str | None: """Find parent doc for an entity type via its source CSV directory.""" csv_path = _SOURCE_FILES.get(entity_type) if not csv_path: return None return file_to_doc.get(csv_path) or dir_to_doc.get( str(PurePosixPath(csv_path).parent) ) # Step 2: Capability → node(type="capability", group="business") for cap in scan_result.capabilities: node_id = cap.capability_id nodes.append(GraphNode( id=node_id, type="capability", label=cap.name, status=file_status_map.get(_SOURCE_FILES["capability"], "unknown"), group_id="business", parent=_parent_for("capability"), )) node_ids.add(node_id) # Step 3: Module → node(type="module", group="application") for mod in scan_result.modules: node_id = mod.module_id nodes.append(GraphNode( id=node_id, type="module", label=mod.name, status=file_status_map.get(_SOURCE_FILES["module"], "unknown"), group_id="application", parent=_parent_for("module"), )) node_ids.add(node_id) # Step 4: Entity → node(type="entity", group="data") for ent in scan_result.entities: node_id = ent.entity_id nodes.append(GraphNode( id=node_id, type="entity", label=ent.name, status=file_status_map.get(_SOURCE_FILES["entity"], "unknown"), group_id="data", parent=_parent_for("entity"), )) node_ids.add(node_id) # Step 5: RuntimeComponent → node(type="runtime_component", group="technology") for rc in scan_result.runtime_components: node_id = rc.component_id nodes.append(GraphNode( id=node_id, type="runtime_component", label=rc.name, status=file_status_map.get(_SOURCE_FILES["runtime_component"], "unknown"), group_id="technology", parent=_parent_for("runtime_component"), )) node_ids.add(node_id) # Step 6: TraceabilityLink → edges for link in scan_result.traceability_links: # capability_id → module_id if link.capability_id in node_ids and link.module_id in node_ids: edges.append(GraphEdge( source=link.capability_id, target=link.module_id, relation="traces_to", )) # module_id → each entity_id for entity_id in link.entity_ids: if link.module_id in node_ids and entity_id in node_ids: edges.append(GraphEdge( source=link.module_id, target=entity_id, relation="traces_to", )) # Step 7: Integration → edges: source_id → target_id for intg in scan_result.integrations: if intg.source_id in node_ids and intg.target_id in node_ids: edges.append(GraphEdge( source=intg.source_id, target=intg.target_id, relation="integrates_with", )) # Step 8: Module.depends_on → edges for mod in scan_result.modules: for dep_id in mod.depends_on: if mod.module_id in node_ids and dep_id in node_ids: edges.append(GraphEdge( source=mod.module_id, target=dep_id, relation="depends_on", )) # Step 9: DesignDocument.downstream → doc-to-doc edges (deduplicated) path_to_doc: dict[str, str] = {} doc_rel_paths: dict[str, str] = {} for doc in scan_result.design_documents: doc_rel = _to_rel_path(doc.file_path, design_dir) path_to_doc[doc_rel] = doc.doc_id doc_rel_paths[doc.doc_id] = doc_rel seen_edges: set[tuple[str, str]] = set() for doc in scan_result.design_documents: doc_rel = doc_rel_paths[doc.doc_id] for down_path in doc.downstream: resolved = _resolve_ref_path(down_path, doc_rel) down_doc_id = path_to_doc.get(resolved) if down_doc_id and down_doc_id in node_ids: edge_key = (doc.doc_id, down_doc_id) if edge_key not in seen_edges: seen_edges.add(edge_key) edges.append(GraphEdge( source=doc.doc_id, target=down_doc_id, relation="documents", )) return GraphView(nodes=nodes, edges=edges, groups=groups) def get_neighbors(self, graph_view: GraphView, node_id: str) -> GraphView: """Return a subgraph containing the given node and all its direct neighbors.""" # Check if node_id exists node_exists = any(n.id == node_id for n in graph_view.nodes) if not node_exists: return GraphView(nodes=[], edges=[], groups=[]) # Find all edges where source==node_id or target==node_id relevant_edges = [ e for e in graph_view.edges if e.source == node_id or e.target == node_id ] # Collect all neighbor node IDs from those edges + the target node itself neighbor_ids: set[str] = {node_id} for edge in relevant_edges: neighbor_ids.add(edge.source) neighbor_ids.add(edge.target) # Filter nodes relevant_nodes = [n for n in graph_view.nodes if n.id in neighbor_ids] # Filter groups to only those referenced by relevant nodes relevant_group_ids = {n.group_id for n in relevant_nodes} relevant_groups = [g for g in graph_view.groups if g.id in relevant_group_ids] return GraphView(nodes=relevant_nodes, edges=relevant_edges, groups=relevant_groups)