Coverage for src / graphable / views / mermaid.py: 96%
153 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-16 21:32 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-16 21:32 +0000
1from atexit import register as on_script_exit
2from dataclasses import dataclass
3from functools import cache
4from logging import getLogger
5from pathlib import Path
6from shlex import quote
7from shutil import which
8from string import Template
9from subprocess import PIPE, CalledProcessError, run
10from tempfile import NamedTemporaryFile
11from typing import Any, Callable
13from ..graph import Graph
14from ..graphable import Graphable
15from ..registry import register_view
17logger = getLogger(__name__)
19_MERMAID_CONFIG_JSON: str = '{ "htmlLabels": false }'
20_MMDC_SCRIPT_TEMPLATE: Template = Template("""
21#!/bin/env bash
22/bin/env mmdc -c $mermaid_config -i $source -o $output -p $puppeteer_config
23""")
24_PUPPETEER_CONFIG_JSON: str = '{ "args": [ "--no-sandbox" ] }'
27def _get_node_style(node: Graphable[Any]) -> str | None:
28 for tag in node.tags:
29 if tag.startswith("color:"):
30 color = tag.split(":", 1)[1]
31 return f"fill:{color},color:white"
32 return None
35def _get_link_style(node: Graphable[Any], subnode: Graphable[Any]) -> str | None:
36 attrs = node.edge_attributes(subnode)
37 if color := attrs.get("color"):
38 return f"stroke:{color},stroke-width:2px"
39 return None
42@dataclass
43class MermaidStylingConfig:
44 """
45 Configuration for customizing Mermaid diagram generation.
47 Attributes:
48 node_ref_fnc: Function to generate the node identifier (reference).
49 node_text_fnc: Function to generate the node label text.
50 node_style_fnc: Function to generate specific style for a node (or None).
51 node_style_default: Default style string for nodes (or None).
52 link_text_fnc: Function to generate label for links between nodes.
53 link_style_fnc: Function to generate style for links (or None).
54 link_style_default: Default style string for links (or None).
55 """
57 node_ref_fnc: Callable[[Graphable[Any]], str] = lambda n: str(n.reference)
58 node_text_fnc: Callable[[Graphable[Any]], str] = lambda n: str(n.reference)
59 node_style_fnc: Callable[[Graphable[Any]], str | None] | None = _get_node_style
60 node_style_default: str | None = None
61 link_text_fnc: Callable[[Graphable[Any], Graphable[Any]], str] = lambda n, sn: "-->"
62 link_style_fnc: Callable[[Graphable[Any], Graphable[Any]], str | None] | None = (
63 _get_link_style
64 )
65 link_style_default: str | None = None
66 cluster_by_tag: bool = False
67 tag_sort_fnc: Callable[[set[str]], list[str]] = lambda s: sorted(list(s))
70def _check_mmdc_on_path() -> None:
71 """Check if 'mmdc' executable is available in the system path."""
72 if which("mmdc") is None:
73 logger.error("mmdc not found on PATH.")
74 raise FileNotFoundError(
75 "mmdc (Mermaid CLI) is required but not available on $PATH. "
76 "Install it via npm: 'npm install -g @mermaid-js/mermaid-cli'."
77 )
80def _cleanup_on_exit(path: Path) -> None:
81 """
82 Remove a temporary file if it still exists at script exit.
84 Args:
85 path (Path): The path to the file to remove.
86 """
87 if path.exists():
88 logger.debug(f"Cleaning up temporary file: {path}")
89 path.unlink()
92def _create_mmdc_script(mmdc_script_content: str) -> Path:
93 """Create a temporary shell script for executing mmdc."""
94 with NamedTemporaryFile(delete=False, mode="w+", suffix=".sh") as f:
95 f.write(mmdc_script_content)
96 mmdc_script: Path = Path(f.name)
97 logger.debug(f"Created temporary mmdc script: {mmdc_script}")
98 return mmdc_script
101def create_mmdc_script_content(source: Path, output: Path) -> str:
102 """
103 Generate the bash script content to run mmdc.
105 Args:
106 source (Path): Path to the source mermaid file.
107 output (Path): Path to the output file.
109 Returns:
110 str: The script content.
111 """
112 mmdc_script_content: str = _MMDC_SCRIPT_TEMPLATE.substitute(
113 mermaid_config=quote(str(_write_mermaid_config())),
114 output=quote(str(output)),
115 puppeteer_config=quote(str(_write_puppeteer_config())),
116 source=quote(str(source)),
117 )
119 return mmdc_script_content
122def _escape_mermaid_string(s: str) -> str:
123 """Escape special characters for Mermaid labels."""
124 # Mermaid labels in square brackets [text] can contain most characters,
125 # but we should escape double quotes and brackets if we use them.
126 return s.replace('"', "#quot;")
129def create_topology_mermaid_mmd(
130 graph: Graph, config: MermaidStylingConfig | None = None
131) -> str:
132 """
133 Generate Mermaid flowchart definition from a Graph.
135 Args:
136 graph (Graph): The graph to convert.
137 config (MermaidStylingConfig | None): Styling configuration.
139 Returns:
140 str: The mermaid graph definition string.
141 """
142 config = config or MermaidStylingConfig()
144 def link_style(node: Graphable[Any], subnode: Graphable[Any]) -> str | None:
145 if config.link_style_fnc:
146 return config.link_style_fnc(node, subnode)
147 return None
149 def node_style(node: Graphable[Any]) -> str | None:
150 if config.node_style_fnc and (style := config.node_style_fnc(node)):
151 return style
152 return config.node_style_default
154 def get_cluster(node: Graphable[Any]) -> str | None:
155 if not config.cluster_by_tag or not node.tags:
156 return None
157 sorted_tags = config.tag_sort_fnc(node.tags)
158 return sorted_tags[0] if sorted_tags else None
160 mermaid: list[str] = ["flowchart TD"]
162 # Group nodes by cluster
163 clusters: dict[str | None, list[Graphable[Any]]] = {}
164 for node in graph.topological_order():
165 cluster = get_cluster(node)
166 if cluster not in clusters:
167 clusters[cluster] = []
168 clusters[cluster].append(node)
170 # Render nodes (potentially within subgraphs)
171 for cluster_name, nodes in clusters.items():
172 indent = ""
173 if cluster_name:
174 mermaid.append(f"subgraph {cluster_name}")
175 indent = " "
177 for node in nodes:
178 node_ref = config.node_ref_fnc(node)
179 node_text = _escape_mermaid_string(config.node_text_fnc(node))
180 mermaid.append(f"{indent}{node_ref}[{node_text}]")
182 if style := node_style(node):
183 mermaid.append(f"{indent}style {node_ref} {style}")
185 if cluster_name:
186 mermaid.append("end")
188 # Render edges
189 link_num: int = 0
190 for node in graph.topological_order():
191 node_ref = config.node_ref_fnc(node)
192 for subnode, _ in graph.internal_dependents(node):
193 subnode_ref = config.node_ref_fnc(subnode)
194 mermaid.append(
195 f"{node_ref} {config.link_text_fnc(node, subnode)} {subnode_ref}"
196 )
197 if style := link_style(node, subnode):
198 mermaid.append(f"linkStyle {link_num} {style}")
199 link_num += 1
201 if config.link_style_default:
202 mermaid.append(f"linkStyle default {config.link_style_default}")
204 return "\n".join(mermaid)
207def _execute_build_script(build_script: Path) -> bool:
208 """
209 Execute the build script.
211 Args:
212 build_script (Path): Path to the script.
214 Returns:
215 bool: True if execution succeeded, False otherwise.
216 """
217 try:
218 run(
219 ["/bin/env", "bash", build_script],
220 check=True,
221 stderr=PIPE,
222 stdout=PIPE,
223 text=True,
224 )
225 return True
226 except CalledProcessError as e:
227 logger.error(f"Error executing {build_script}: {e.stderr}")
228 except FileNotFoundError:
229 logger.error("Could not execute script: file not found.")
230 return False
233@register_view(".mmd", creator_fnc=create_topology_mermaid_mmd)
234def export_topology_mermaid_mmd(
235 graph: Graph, output: Path, config: MermaidStylingConfig | None = None
236) -> None:
237 """
238 Export the graph to a Mermaid .mmd file.
240 Args:
241 graph (Graph): The graph to export.
242 output (Path): The output file path.
243 config (MermaidStylingConfig | None): Styling configuration.
244 """
245 logger.info(f"Exporting mermaid mmd to: {output}")
246 with open(output, "w+") as f:
247 f.write(create_topology_mermaid_mmd(graph, config))
250@register_view([".svg", ".png"])
251def export_topology_mermaid_image(
252 graph: Graph,
253 output: Path,
254 config: MermaidStylingConfig | None = None,
255 embed_checksum: bool = False,
256) -> None:
257 """
258 Export the graph to an image file (SVG or PNG) using mmdc.
260 Args:
261 graph (Graph): The graph to export.
262 output (Path): The output file path.
263 config (MermaidStylingConfig | None): Styling configuration.
264 embed_checksum (bool): If True, embed the graph's checksum as a comment.
265 """
266 logger.info(f"Exporting mermaid image to: {output}")
267 _check_mmdc_on_path()
269 p = Path(output)
270 mermaid: str = create_topology_mermaid_mmd(graph, config)
272 if embed_checksum:
273 from .utils import wrap_with_checksum
275 mermaid = wrap_with_checksum(mermaid, graph.checksum(), ".mmd")
277 with NamedTemporaryFile(delete=False, mode="w+", suffix=".mmd") as f:
278 f.write(mermaid)
279 source: Path = Path(f.name)
281 logger.debug(f"Created temporary mermaid source file: {source}")
283 build_script: Path = _create_mmdc_script(
284 create_mmdc_script_content(source=source, output=p)
285 )
287 if _execute_build_script(build_script):
288 build_script.unlink()
289 source.unlink()
290 logger.info(f"Successfully exported SVG to {output}")
291 else:
292 logger.error(f"Failed to export SVG to {output}")
295@cache
296def _write_mermaid_config() -> Path:
297 """Write temporary mermaid config file."""
298 with NamedTemporaryFile(delete=False, mode="w+", suffix=".json") as f:
299 f.write(_MERMAID_CONFIG_JSON)
301 path: Path = Path(f.name)
302 on_script_exit(lambda: _cleanup_on_exit(path))
303 return path
306@cache
307def _write_puppeteer_config() -> Path:
308 """Write temporary puppeteer config file."""
309 with NamedTemporaryFile(delete=False, mode="w+", suffix=".json") as f:
310 f.write(_PUPPETEER_CONFIG_JSON)
312 path: Path = Path(f.name)
313 on_script_exit(lambda: _cleanup_on_exit(path))
314 return path