Coverage for src / graphable / views / d2.py: 100%

105 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-16 21:32 +0000

1from dataclasses import dataclass 

2from logging import getLogger 

3from pathlib import Path 

4from shutil import which 

5from subprocess import PIPE, CalledProcessError, run 

6from typing import Any, Callable 

7 

8from ..graph import Graph 

9from ..graphable import Graphable 

10from ..registry import register_view 

11 

12logger = getLogger(__name__) 

13 

14 

15@dataclass 

16class D2StylingConfig: 

17 """ 

18 Configuration for customizing D2 diagram generation. 

19 

20 Attributes: 

21 node_ref_fnc: Function to generate the node identifier (reference). 

22 node_label_fnc: Function to generate the node label. 

23 node_style_fnc: Function to generate a dictionary of style attributes for a node. 

24 edge_style_fnc: Function to generate a dictionary of style attributes for an edge. 

25 global_style: Dictionary of global styling attributes. 

26 layout: D2 layout engine to use (e.g., 'dagre', 'elk'). 

27 theme: D2 theme ID to use. 

28 """ 

29 

30 node_ref_fnc: Callable[[Graphable[Any]], str] = lambda n: str(n.reference) 

31 node_label_fnc: Callable[[Graphable[Any]], str] = lambda n: str(n.reference) 

32 node_style_fnc: Callable[[Graphable[Any]], dict[str, str]] | None = None 

33 edge_style_fnc: ( 

34 Callable[[Graphable[Any], Graphable[Any]], dict[str, str]] | None 

35 ) = None 

36 global_style: dict[str, str] | None = None 

37 layout: str | None = None 

38 theme: str | None = None 

39 cluster_by_tag: bool = False 

40 tag_sort_fnc: Callable[[set[str]], list[str]] = lambda s: sorted(list(s)) 

41 

42 

43def _check_d2_on_path() -> None: 

44 """Check if 'd2' executable is available in the system path.""" 

45 if which("d2") is None: 

46 logger.error("d2 not found on PATH.") 

47 raise FileNotFoundError( 

48 "d2 is required but not available on $PATH. " 

49 "Install it via the official script: 'curl -fsSL https://d2lang.com/install.sh | sh -s --'." 

50 ) 

51 

52 

53def _format_styles(styles: dict[str, str] | None, indent: str = " ") -> list[str]: 

54 """Format a dictionary of styles into D2 style lines.""" 

55 if not styles: 

56 return [] 

57 lines = [f"{indent}style: {{"] 

58 for k, v in styles.items(): 

59 lines.append(f"{indent} {k}: {v}") 

60 lines.append(f"{indent}}}") 

61 return lines 

62 

63 

64def create_topology_d2(graph: Graph, config: D2StylingConfig | None = None) -> str: 

65 """ 

66 Generate D2 definition from a Graph. 

67 

68 Args: 

69 graph (Graph): The graph to convert. 

70 config (D2StylingConfig | None): Styling configuration. 

71 

72 Returns: 

73 str: The D2 definition string. 

74 """ 

75 config = config or D2StylingConfig() 

76 d2: list[str] = [] 

77 

78 if config.layout: 

79 d2.append( 

80 f"vars: {{\n d2-config: {{\n layout-engine: {config.layout}\n }}\n}}" 

81 ) 

82 

83 if config.theme: 

84 d2.append("direction: down") # Default direction 

85 

86 def get_cluster(node: Graphable[Any]) -> str | None: 

87 if not config.cluster_by_tag or not node.tags: 

88 return None 

89 sorted_tags = config.tag_sort_fnc(node.tags) 

90 return sorted_tags[0] if sorted_tags else None 

91 

92 # Group nodes by cluster 

93 clusters: dict[str | None, list[Graphable[Any]]] = {} 

94 for node in graph.topological_order(): 

95 cluster = get_cluster(node) 

96 if cluster not in clusters: 

97 clusters[cluster] = [] 

98 clusters[cluster].append(node) 

99 

100 # Nodes (potentially within clusters) 

101 for cluster_name, nodes in clusters.items(): 

102 indent = "" 

103 if cluster_name: 

104 d2.append(f"{cluster_name}: {{") 

105 indent = " " 

106 

107 for node in nodes: 

108 node_ref = config.node_ref_fnc(node) 

109 node_label = config.node_label_fnc(node) 

110 d2.append(f"{indent}{node_ref}: {node_label}") 

111 

112 if config.node_style_fnc: 

113 styles = config.node_style_fnc(node) 

114 d2.extend(_format_styles(styles, indent=indent + " ")) 

115 

116 if cluster_name: 

117 d2.append("}") 

118 

119 # Edges 

120 for node in graph.topological_order(): 

121 node_ref = config.node_ref_fnc(node) 

122 for dependent, _ in graph.internal_dependents(node): 

123 # If nodes are in clusters, D2 handles flat references or nested references. 

124 # Usually, if IDs are unique, flat references work. 

125 # But if we wanted to be explicit: cluster.node_ref 

126 # For now, let's assume node_ref is globally unique (the default is reference string). 

127 edge_line = f"{node_ref} -> {config.node_ref_fnc(dependent)}" 

128 

129 if config.edge_style_fnc: 

130 edge_styles = config.edge_style_fnc(node, dependent) 

131 if edge_styles: 

132 d2.append(f"{edge_line}: {{") 

133 d2.extend(_format_styles(edge_styles, indent=" ")) 

134 d2.append("}") 

135 else: 

136 d2.append(edge_line) 

137 else: 

138 d2.append(edge_line) 

139 

140 return "\n".join(d2) 

141 

142 

143@register_view(".d2", creator_fnc=create_topology_d2) 

144def export_topology_d2( 

145 graph: Graph, output: Path, config: D2StylingConfig | None = None 

146) -> None: 

147 """ 

148 Export the graph to a D2 file. 

149 

150 Args: 

151 graph (Graph): The graph to export. 

152 output (Path): The output file path. 

153 config (D2StylingConfig | None): Styling configuration. 

154 """ 

155 logger.info(f"Exporting D2 definition to: {output}") 

156 with open(output, "w+") as f: 

157 f.write(create_topology_d2(graph, config)) 

158 

159 

160@register_view([".svg", ".png"]) 

161def export_topology_d2_image( 

162 graph: Graph, output: Path, config: D2StylingConfig | None = None 

163) -> None: 

164 """ 

165 Export the graph to an image file (SVG or PNG) using the 'd2' command. 

166 

167 Args: 

168 graph (Graph): The graph to export. 

169 output (Path): The output file path. 

170 config (D2StylingConfig | None): Styling configuration. 

171 """ 

172 logger.info(f"Exporting D2 image to: {output}") 

173 _check_d2_on_path() 

174 

175 p = Path(output) 

176 d2_content: str = create_topology_d2(graph, config) 

177 

178 cmd = ["d2"] 

179 if config and config.theme: 

180 cmd.extend(["--theme", config.theme]) 

181 if config and config.layout: 

182 cmd.extend(["--layout", config.layout]) 

183 

184 cmd.extend(["-", str(p)]) 

185 

186 try: 

187 run( 

188 cmd, 

189 input=d2_content, 

190 check=True, 

191 stderr=PIPE, 

192 stdout=PIPE, 

193 text=True, 

194 ) 

195 fmt = p.suffix[1:].upper() 

196 logger.info(f"Successfully exported {fmt} to {output}") 

197 except CalledProcessError as e: 

198 logger.error(f"Error executing d2: {e.stderr}") 

199 raise 

200 except Exception as e: 

201 logger.error(f"Failed to export image to {output}: {e}") 

202 raise