Coverage for src / graphable / views / mermaid.py: 96%

153 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-16 21:32 +0000

1from atexit import register as on_script_exit 

2from dataclasses import dataclass 

3from functools import cache 

4from logging import getLogger 

5from pathlib import Path 

6from shlex import quote 

7from shutil import which 

8from string import Template 

9from subprocess import PIPE, CalledProcessError, run 

10from tempfile import NamedTemporaryFile 

11from typing import Any, Callable 

12 

13from ..graph import Graph 

14from ..graphable import Graphable 

15from ..registry import register_view 

16 

17logger = getLogger(__name__) 

18 

19_MERMAID_CONFIG_JSON: str = '{ "htmlLabels": false }' 

20_MMDC_SCRIPT_TEMPLATE: Template = Template(""" 

21#!/bin/env bash 

22/bin/env mmdc -c $mermaid_config -i $source -o $output -p $puppeteer_config 

23""") 

24_PUPPETEER_CONFIG_JSON: str = '{ "args": [ "--no-sandbox" ] }' 

25 

26 

27def _get_node_style(node: Graphable[Any]) -> str | None: 

28 for tag in node.tags: 

29 if tag.startswith("color:"): 

30 color = tag.split(":", 1)[1] 

31 return f"fill:{color},color:white" 

32 return None 

33 

34 

35def _get_link_style(node: Graphable[Any], subnode: Graphable[Any]) -> str | None: 

36 attrs = node.edge_attributes(subnode) 

37 if color := attrs.get("color"): 

38 return f"stroke:{color},stroke-width:2px" 

39 return None 

40 

41 

42@dataclass 

43class MermaidStylingConfig: 

44 """ 

45 Configuration for customizing Mermaid diagram generation. 

46 

47 Attributes: 

48 node_ref_fnc: Function to generate the node identifier (reference). 

49 node_text_fnc: Function to generate the node label text. 

50 node_style_fnc: Function to generate specific style for a node (or None). 

51 node_style_default: Default style string for nodes (or None). 

52 link_text_fnc: Function to generate label for links between nodes. 

53 link_style_fnc: Function to generate style for links (or None). 

54 link_style_default: Default style string for links (or None). 

55 """ 

56 

57 node_ref_fnc: Callable[[Graphable[Any]], str] = lambda n: str(n.reference) 

58 node_text_fnc: Callable[[Graphable[Any]], str] = lambda n: str(n.reference) 

59 node_style_fnc: Callable[[Graphable[Any]], str | None] | None = _get_node_style 

60 node_style_default: str | None = None 

61 link_text_fnc: Callable[[Graphable[Any], Graphable[Any]], str] = lambda n, sn: "-->" 

62 link_style_fnc: Callable[[Graphable[Any], Graphable[Any]], str | None] | None = ( 

63 _get_link_style 

64 ) 

65 link_style_default: str | None = None 

66 cluster_by_tag: bool = False 

67 tag_sort_fnc: Callable[[set[str]], list[str]] = lambda s: sorted(list(s)) 

68 

69 

70def _check_mmdc_on_path() -> None: 

71 """Check if 'mmdc' executable is available in the system path.""" 

72 if which("mmdc") is None: 

73 logger.error("mmdc not found on PATH.") 

74 raise FileNotFoundError( 

75 "mmdc (Mermaid CLI) is required but not available on $PATH. " 

76 "Install it via npm: 'npm install -g @mermaid-js/mermaid-cli'." 

77 ) 

78 

79 

80def _cleanup_on_exit(path: Path) -> None: 

81 """ 

82 Remove a temporary file if it still exists at script exit. 

83 

84 Args: 

85 path (Path): The path to the file to remove. 

86 """ 

87 if path.exists(): 

88 logger.debug(f"Cleaning up temporary file: {path}") 

89 path.unlink() 

90 

91 

92def _create_mmdc_script(mmdc_script_content: str) -> Path: 

93 """Create a temporary shell script for executing mmdc.""" 

94 with NamedTemporaryFile(delete=False, mode="w+", suffix=".sh") as f: 

95 f.write(mmdc_script_content) 

96 mmdc_script: Path = Path(f.name) 

97 logger.debug(f"Created temporary mmdc script: {mmdc_script}") 

98 return mmdc_script 

99 

100 

101def create_mmdc_script_content(source: Path, output: Path) -> str: 

102 """ 

103 Generate the bash script content to run mmdc. 

104 

105 Args: 

106 source (Path): Path to the source mermaid file. 

107 output (Path): Path to the output file. 

108 

109 Returns: 

110 str: The script content. 

111 """ 

112 mmdc_script_content: str = _MMDC_SCRIPT_TEMPLATE.substitute( 

113 mermaid_config=quote(str(_write_mermaid_config())), 

114 output=quote(str(output)), 

115 puppeteer_config=quote(str(_write_puppeteer_config())), 

116 source=quote(str(source)), 

117 ) 

118 

119 return mmdc_script_content 

120 

121 

122def _escape_mermaid_string(s: str) -> str: 

123 """Escape special characters for Mermaid labels.""" 

124 # Mermaid labels in square brackets [text] can contain most characters, 

125 # but we should escape double quotes and brackets if we use them. 

126 return s.replace('"', "#quot;") 

127 

128 

129def create_topology_mermaid_mmd( 

130 graph: Graph, config: MermaidStylingConfig | None = None 

131) -> str: 

132 """ 

133 Generate Mermaid flowchart definition from a Graph. 

134 

135 Args: 

136 graph (Graph): The graph to convert. 

137 config (MermaidStylingConfig | None): Styling configuration. 

138 

139 Returns: 

140 str: The mermaid graph definition string. 

141 """ 

142 config = config or MermaidStylingConfig() 

143 

144 def link_style(node: Graphable[Any], subnode: Graphable[Any]) -> str | None: 

145 if config.link_style_fnc: 

146 return config.link_style_fnc(node, subnode) 

147 return None 

148 

149 def node_style(node: Graphable[Any]) -> str | None: 

150 if config.node_style_fnc and (style := config.node_style_fnc(node)): 

151 return style 

152 return config.node_style_default 

153 

154 def get_cluster(node: Graphable[Any]) -> str | None: 

155 if not config.cluster_by_tag or not node.tags: 

156 return None 

157 sorted_tags = config.tag_sort_fnc(node.tags) 

158 return sorted_tags[0] if sorted_tags else None 

159 

160 mermaid: list[str] = ["flowchart TD"] 

161 

162 # Group nodes by cluster 

163 clusters: dict[str | None, list[Graphable[Any]]] = {} 

164 for node in graph.topological_order(): 

165 cluster = get_cluster(node) 

166 if cluster not in clusters: 

167 clusters[cluster] = [] 

168 clusters[cluster].append(node) 

169 

170 # Render nodes (potentially within subgraphs) 

171 for cluster_name, nodes in clusters.items(): 

172 indent = "" 

173 if cluster_name: 

174 mermaid.append(f"subgraph {cluster_name}") 

175 indent = " " 

176 

177 for node in nodes: 

178 node_ref = config.node_ref_fnc(node) 

179 node_text = _escape_mermaid_string(config.node_text_fnc(node)) 

180 mermaid.append(f"{indent}{node_ref}[{node_text}]") 

181 

182 if style := node_style(node): 

183 mermaid.append(f"{indent}style {node_ref} {style}") 

184 

185 if cluster_name: 

186 mermaid.append("end") 

187 

188 # Render edges 

189 link_num: int = 0 

190 for node in graph.topological_order(): 

191 node_ref = config.node_ref_fnc(node) 

192 for subnode, _ in graph.internal_dependents(node): 

193 subnode_ref = config.node_ref_fnc(subnode) 

194 mermaid.append( 

195 f"{node_ref} {config.link_text_fnc(node, subnode)} {subnode_ref}" 

196 ) 

197 if style := link_style(node, subnode): 

198 mermaid.append(f"linkStyle {link_num} {style}") 

199 link_num += 1 

200 

201 if config.link_style_default: 

202 mermaid.append(f"linkStyle default {config.link_style_default}") 

203 

204 return "\n".join(mermaid) 

205 

206 

207def _execute_build_script(build_script: Path) -> bool: 

208 """ 

209 Execute the build script. 

210 

211 Args: 

212 build_script (Path): Path to the script. 

213 

214 Returns: 

215 bool: True if execution succeeded, False otherwise. 

216 """ 

217 try: 

218 run( 

219 ["/bin/env", "bash", build_script], 

220 check=True, 

221 stderr=PIPE, 

222 stdout=PIPE, 

223 text=True, 

224 ) 

225 return True 

226 except CalledProcessError as e: 

227 logger.error(f"Error executing {build_script}: {e.stderr}") 

228 except FileNotFoundError: 

229 logger.error("Could not execute script: file not found.") 

230 return False 

231 

232 

233@register_view(".mmd", creator_fnc=create_topology_mermaid_mmd) 

234def export_topology_mermaid_mmd( 

235 graph: Graph, output: Path, config: MermaidStylingConfig | None = None 

236) -> None: 

237 """ 

238 Export the graph to a Mermaid .mmd file. 

239 

240 Args: 

241 graph (Graph): The graph to export. 

242 output (Path): The output file path. 

243 config (MermaidStylingConfig | None): Styling configuration. 

244 """ 

245 logger.info(f"Exporting mermaid mmd to: {output}") 

246 with open(output, "w+") as f: 

247 f.write(create_topology_mermaid_mmd(graph, config)) 

248 

249 

250@register_view([".svg", ".png"]) 

251def export_topology_mermaid_image( 

252 graph: Graph, 

253 output: Path, 

254 config: MermaidStylingConfig | None = None, 

255 embed_checksum: bool = False, 

256) -> None: 

257 """ 

258 Export the graph to an image file (SVG or PNG) using mmdc. 

259 

260 Args: 

261 graph (Graph): The graph to export. 

262 output (Path): The output file path. 

263 config (MermaidStylingConfig | None): Styling configuration. 

264 embed_checksum (bool): If True, embed the graph's checksum as a comment. 

265 """ 

266 logger.info(f"Exporting mermaid image to: {output}") 

267 _check_mmdc_on_path() 

268 

269 p = Path(output) 

270 mermaid: str = create_topology_mermaid_mmd(graph, config) 

271 

272 if embed_checksum: 

273 from .utils import wrap_with_checksum 

274 

275 mermaid = wrap_with_checksum(mermaid, graph.checksum(), ".mmd") 

276 

277 with NamedTemporaryFile(delete=False, mode="w+", suffix=".mmd") as f: 

278 f.write(mermaid) 

279 source: Path = Path(f.name) 

280 

281 logger.debug(f"Created temporary mermaid source file: {source}") 

282 

283 build_script: Path = _create_mmdc_script( 

284 create_mmdc_script_content(source=source, output=p) 

285 ) 

286 

287 if _execute_build_script(build_script): 

288 build_script.unlink() 

289 source.unlink() 

290 logger.info(f"Successfully exported SVG to {output}") 

291 else: 

292 logger.error(f"Failed to export SVG to {output}") 

293 

294 

295@cache 

296def _write_mermaid_config() -> Path: 

297 """Write temporary mermaid config file.""" 

298 with NamedTemporaryFile(delete=False, mode="w+", suffix=".json") as f: 

299 f.write(_MERMAID_CONFIG_JSON) 

300 

301 path: Path = Path(f.name) 

302 on_script_exit(lambda: _cleanup_on_exit(path)) 

303 return path 

304 

305 

306@cache 

307def _write_puppeteer_config() -> Path: 

308 """Write temporary puppeteer config file.""" 

309 with NamedTemporaryFile(delete=False, mode="w+", suffix=".json") as f: 

310 f.write(_PUPPETEER_CONFIG_JSON) 

311 

312 path: Path = Path(f.name) 

313 on_script_exit(lambda: _cleanup_on_exit(path)) 

314 return path