Coverage for src / graphable / views / d2.py: 100%
105 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-16 21:32 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-16 21:32 +0000
1from dataclasses import dataclass
2from logging import getLogger
3from pathlib import Path
4from shutil import which
5from subprocess import PIPE, CalledProcessError, run
6from typing import Any, Callable
8from ..graph import Graph
9from ..graphable import Graphable
10from ..registry import register_view
12logger = getLogger(__name__)
15@dataclass
16class D2StylingConfig:
17 """
18 Configuration for customizing D2 diagram generation.
20 Attributes:
21 node_ref_fnc: Function to generate the node identifier (reference).
22 node_label_fnc: Function to generate the node label.
23 node_style_fnc: Function to generate a dictionary of style attributes for a node.
24 edge_style_fnc: Function to generate a dictionary of style attributes for an edge.
25 global_style: Dictionary of global styling attributes.
26 layout: D2 layout engine to use (e.g., 'dagre', 'elk').
27 theme: D2 theme ID to use.
28 """
30 node_ref_fnc: Callable[[Graphable[Any]], str] = lambda n: str(n.reference)
31 node_label_fnc: Callable[[Graphable[Any]], str] = lambda n: str(n.reference)
32 node_style_fnc: Callable[[Graphable[Any]], dict[str, str]] | None = None
33 edge_style_fnc: (
34 Callable[[Graphable[Any], Graphable[Any]], dict[str, str]] | None
35 ) = None
36 global_style: dict[str, str] | None = None
37 layout: str | None = None
38 theme: str | None = None
39 cluster_by_tag: bool = False
40 tag_sort_fnc: Callable[[set[str]], list[str]] = lambda s: sorted(list(s))
43def _check_d2_on_path() -> None:
44 """Check if 'd2' executable is available in the system path."""
45 if which("d2") is None:
46 logger.error("d2 not found on PATH.")
47 raise FileNotFoundError(
48 "d2 is required but not available on $PATH. "
49 "Install it via the official script: 'curl -fsSL https://d2lang.com/install.sh | sh -s --'."
50 )
53def _format_styles(styles: dict[str, str] | None, indent: str = " ") -> list[str]:
54 """Format a dictionary of styles into D2 style lines."""
55 if not styles:
56 return []
57 lines = [f"{indent}style: {{"]
58 for k, v in styles.items():
59 lines.append(f"{indent} {k}: {v}")
60 lines.append(f"{indent}}}")
61 return lines
64def create_topology_d2(graph: Graph, config: D2StylingConfig | None = None) -> str:
65 """
66 Generate D2 definition from a Graph.
68 Args:
69 graph (Graph): The graph to convert.
70 config (D2StylingConfig | None): Styling configuration.
72 Returns:
73 str: The D2 definition string.
74 """
75 config = config or D2StylingConfig()
76 d2: list[str] = []
78 if config.layout:
79 d2.append(
80 f"vars: {{\n d2-config: {{\n layout-engine: {config.layout}\n }}\n}}"
81 )
83 if config.theme:
84 d2.append("direction: down") # Default direction
86 def get_cluster(node: Graphable[Any]) -> str | None:
87 if not config.cluster_by_tag or not node.tags:
88 return None
89 sorted_tags = config.tag_sort_fnc(node.tags)
90 return sorted_tags[0] if sorted_tags else None
92 # Group nodes by cluster
93 clusters: dict[str | None, list[Graphable[Any]]] = {}
94 for node in graph.topological_order():
95 cluster = get_cluster(node)
96 if cluster not in clusters:
97 clusters[cluster] = []
98 clusters[cluster].append(node)
100 # Nodes (potentially within clusters)
101 for cluster_name, nodes in clusters.items():
102 indent = ""
103 if cluster_name:
104 d2.append(f"{cluster_name}: {{")
105 indent = " "
107 for node in nodes:
108 node_ref = config.node_ref_fnc(node)
109 node_label = config.node_label_fnc(node)
110 d2.append(f"{indent}{node_ref}: {node_label}")
112 if config.node_style_fnc:
113 styles = config.node_style_fnc(node)
114 d2.extend(_format_styles(styles, indent=indent + " "))
116 if cluster_name:
117 d2.append("}")
119 # Edges
120 for node in graph.topological_order():
121 node_ref = config.node_ref_fnc(node)
122 for dependent, _ in graph.internal_dependents(node):
123 # If nodes are in clusters, D2 handles flat references or nested references.
124 # Usually, if IDs are unique, flat references work.
125 # But if we wanted to be explicit: cluster.node_ref
126 # For now, let's assume node_ref is globally unique (the default is reference string).
127 edge_line = f"{node_ref} -> {config.node_ref_fnc(dependent)}"
129 if config.edge_style_fnc:
130 edge_styles = config.edge_style_fnc(node, dependent)
131 if edge_styles:
132 d2.append(f"{edge_line}: {{")
133 d2.extend(_format_styles(edge_styles, indent=" "))
134 d2.append("}")
135 else:
136 d2.append(edge_line)
137 else:
138 d2.append(edge_line)
140 return "\n".join(d2)
143@register_view(".d2", creator_fnc=create_topology_d2)
144def export_topology_d2(
145 graph: Graph, output: Path, config: D2StylingConfig | None = None
146) -> None:
147 """
148 Export the graph to a D2 file.
150 Args:
151 graph (Graph): The graph to export.
152 output (Path): The output file path.
153 config (D2StylingConfig | None): Styling configuration.
154 """
155 logger.info(f"Exporting D2 definition to: {output}")
156 with open(output, "w+") as f:
157 f.write(create_topology_d2(graph, config))
160@register_view([".svg", ".png"])
161def export_topology_d2_image(
162 graph: Graph, output: Path, config: D2StylingConfig | None = None
163) -> None:
164 """
165 Export the graph to an image file (SVG or PNG) using the 'd2' command.
167 Args:
168 graph (Graph): The graph to export.
169 output (Path): The output file path.
170 config (D2StylingConfig | None): Styling configuration.
171 """
172 logger.info(f"Exporting D2 image to: {output}")
173 _check_d2_on_path()
175 p = Path(output)
176 d2_content: str = create_topology_d2(graph, config)
178 cmd = ["d2"]
179 if config and config.theme:
180 cmd.extend(["--theme", config.theme])
181 if config and config.layout:
182 cmd.extend(["--layout", config.layout])
184 cmd.extend(["-", str(p)])
186 try:
187 run(
188 cmd,
189 input=d2_content,
190 check=True,
191 stderr=PIPE,
192 stdout=PIPE,
193 text=True,
194 )
195 fmt = p.suffix[1:].upper()
196 logger.info(f"Successfully exported {fmt} to {output}")
197 except CalledProcessError as e:
198 logger.error(f"Error executing d2: {e.stderr}")
199 raise
200 except Exception as e:
201 logger.error(f"Failed to export image to {output}: {e}")
202 raise