Coverage for tests / unit / test_graph.py: 100%
680 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-16 21:32 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-16 21:32 +0000
1from json import loads
2from unittest.mock import MagicMock, patch
4from pytest import fixture, raises
6from graphable.graph import Graph, GraphConsistencyError, GraphCycleError
7from graphable.graphable import Graphable
10class TestGraph:
11 @fixture
12 def nodes(self):
13 a = Graphable("A")
14 b = Graphable("B")
15 c = Graphable("C")
16 return a, b, c
18 def test_initialization(self):
19 g = Graph()
20 assert len(g.sources) == 0
22 def test_add_node(self):
23 g = Graph()
24 node = Graphable("A")
25 assert g.add_node(node) is True
26 assert g.add_node(node) is False # Already added
27 assert node in g._nodes
29 def test_add_edge(self, nodes):
30 a, b, _ = nodes
31 g = Graph()
32 g.add_edge(a, b)
34 assert a in g._nodes
35 assert b in g._nodes
36 assert b in a.dependents
37 assert a in b.depends_on
39 def test_sinks_and_sources(self, nodes):
40 a, b, c = nodes
41 g = Graph()
42 g.add_edge(a, b)
43 g.add_edge(b, c)
45 assert [a] == g.sources
46 assert [c] == g.sinks
48 def test_topological_order(self, nodes):
49 a, b, c = nodes
50 g = Graph()
51 g.add_edge(a, b)
52 g.add_edge(b, c)
54 topo = g.topological_order()
55 assert topo.index(a) < topo.index(b)
56 assert topo.index(b) < topo.index(c)
58 def test_subgraph_filtered(self, nodes):
59 a, b, c = nodes
60 a.add_tag("keep")
61 c.add_tag("keep")
63 g = Graph()
64 g.add_edge(a, b)
65 g.add_edge(b, c)
67 sub = g.subgraph_filtered(lambda n: n.is_tagged("keep"))
68 nodes_in_sub = sub.topological_order()
69 assert a in nodes_in_sub
70 assert c in nodes_in_sub
71 assert b in nodes_in_sub
73 def test_subgraph_tagged(self, nodes):
74 a, b, c = nodes
75 a.add_tag("t")
76 g = Graph()
77 g.add_edge(a, b)
79 sub = g.subgraph_tagged("t")
80 assert b in sub.topological_order()
82 def test_topological_order_filtered(self, nodes):
83 a, b, c = nodes
84 g = Graph()
85 g.add_edge(a, b)
86 g.add_edge(b, c)
88 filtered = g.topological_order_filtered(lambda n: n.reference == "B")
89 assert len(filtered) == 1
90 assert filtered[0] == b
92 def test_topological_order_tagged(self, nodes):
93 a, b, c = nodes
94 b.add_tag("target")
95 g = Graph()
96 g.add_edge(a, b)
97 g.add_edge(b, c)
99 tagged = g.topological_order_tagged("target")
100 assert len(tagged) == 1
101 assert tagged[0] == b
103 def test_graph_factory(self, nodes):
104 a, b, c = nodes
105 a._add_dependent(b)
106 b._add_depends_on(a)
107 b._add_dependent(c)
108 c._add_depends_on(b)
110 g = Graph([b], discover=True)
111 topo = g.topological_order()
112 assert len(topo) == 3
113 assert a in topo
114 assert c in topo
116 def test_topological_order_caching(self, nodes):
117 a, b, _ = nodes
118 g = Graph({a, b})
120 # Initial call calculates and caches
121 topo1 = g.topological_order()
122 assert g._topological_order is not None
124 # Subsequent call returns cached value
125 topo2 = g.topological_order()
126 assert topo1 is topo2
128 # Adding a node invalidates cache
129 c = Graphable("C")
130 g.add_node(c)
131 assert g._topological_order is None
133 # Recalculate
134 topo3 = g.topological_order()
135 assert g._topological_order is not None
136 assert c in topo3
138 def test_checksum_caching(self, nodes):
139 a, _, _ = nodes
140 g = Graph({a})
142 # Initial call calculates and caches
143 c1 = g.checksum()
144 assert g._checksum is not None
146 # Subsequent call returns cached value
147 c2 = g.checksum()
148 assert c1 == c2
149 assert g._checksum == c1
151 # Adding a tag to a node should invalidate the graph's checksum cache
152 a.add_tag("new-tag")
153 assert g._checksum is None
155 # Recalculate
156 c3 = g.checksum()
157 assert c3 != c1
158 assert g._checksum == c3
160 def test_parallelized_topological_order_caching(self, nodes):
161 a, b, _ = nodes
162 a.add_dependent(b)
163 g = Graph({a, b})
165 # Initial call calculates and caches
166 order1 = g.parallelized_topological_order()
167 assert g._parallel_topological_order is not None
168 assert order1 is g._parallel_topological_order
170 # Subsequent call uses cached value
171 order2 = g.parallelized_topological_order()
172 assert order2 is order1
174 # Adding an edge should invalidate cache
175 c = Graphable("C")
176 g.add_node(c)
177 g.add_edge(b, c)
178 assert g._parallel_topological_order is None
180 # Recalculate
181 order3 = g.parallelized_topological_order()
182 assert g._parallel_topological_order is not None
183 assert len(order3) == 3 # A, B, C in separate layers
185 def test_node_tag_invalidates_all_caches(self, nodes):
186 a, _, _ = nodes
187 g = Graph({a})
189 g.topological_order()
190 g.parallelized_topological_order()
191 g.checksum()
193 assert g._topological_order is not None
194 assert g._parallel_topological_order is not None
195 assert g._checksum is not None
197 # Modify node
198 a.add_tag("important")
200 assert g._topological_order is None
201 assert g._parallel_topological_order is None
202 assert g._checksum is None
204 def test_node_dependency_change_invalidates_all_caches(self, nodes):
205 a, b, _ = nodes
206 g = Graph({a, b})
208 g.topological_order()
209 g.parallelized_topological_order()
210 g.checksum()
212 assert g._topological_order is not None
213 assert g._parallel_topological_order is not None
214 assert g._checksum is not None
216 # Modify node dependency externally (not via graph.add_edge)
217 a.add_dependent(b)
219 assert g._topological_order is None
220 assert g._parallel_topological_order is None
221 assert g._checksum is None
223 def test_node_edge_removal_invalidates_graph_cache(self, nodes):
224 a, b, _ = nodes
225 g = Graph()
226 g.add_edge(a, b)
228 g.checksum()
229 assert g._checksum is not None
231 # Remove edge via node method directly
232 a._remove_dependent(b)
233 b._remove_depends_on(a)
235 assert g._checksum is None
237 def test_multiple_graphs_observing_same_node(self, nodes):
238 a, _, _ = nodes
239 g1 = Graph({a})
240 g2 = Graph({a})
242 g1.checksum()
243 g2.checksum()
245 assert g1._checksum is not None
246 assert g2._checksum is not None
248 a.add_tag("shared")
250 assert g1._checksum is None
251 assert g2._checksum is None
253 def test_subgraph_filtering_in_topological_order(self, nodes):
254 a, b, _ = nodes
255 a.add_dependent(b)
257 # Graph only contains A
258 g = Graph({a})
260 # B should not be in the results even though it's a dependent
261 order = g.topological_order()
262 assert order == [a]
263 assert b not in order
265 parallel = g.parallelized_topological_order()
266 assert parallel == [{a}]
268 def test_external_node_change_does_not_affect_checksum(self, nodes):
269 a, b, _ = nodes
270 a.add_dependent(b)
272 g = Graph({a})
273 c1 = g.checksum()
275 # Modifying B (outside G) should NOT change G's checksum
276 # because we only include internal edges now.
277 b.add_tag("external-change")
278 c2 = g.checksum()
280 assert c1 == c2
282 def test_internal_node_change_invalidates_cache(self, nodes):
283 a, b, _ = nodes
284 g = Graph({a, b})
285 a.add_dependent(b)
287 c1 = g.checksum()
288 assert g._checksum is not None
290 # Modifying internal node B should invalidate G's cache
291 b.add_tag("internal-change")
292 assert g._checksum is None
293 assert g.checksum() != c1
295 def test_discover_pulls_in_external_nodes(self, nodes):
296 a, b, c = nodes
297 a.add_dependent(b)
298 b.add_dependent(c)
300 # Graph only starts with A
301 g = Graph({a})
302 assert len(g) == 1
303 assert b not in g
305 # Discover should pull in B and C
306 g.discover()
307 assert len(g) == 3
308 assert b in g
309 assert c in g
311 # Now G is an observer for B and C
312 g.checksum()
313 assert g._checksum is not None
314 c.add_tag("new-info")
315 assert g._checksum is None
317 def test_add_edge_self_loop(self):
318 a = Graphable("A")
319 g = Graph()
320 with raises(GraphCycleError) as excinfo:
321 g.add_edge(a, a)
322 assert "Self-loop" in str(excinfo.value)
323 assert excinfo.value.cycle == [a, a]
325 def test_add_edge_simple_cycle(self):
326 a = Graphable("A")
327 b = Graphable("B")
328 g = Graph()
329 g.add_edge(a, b)
330 with raises(GraphCycleError) as excinfo:
331 g.add_edge(b, a)
332 assert "create a cycle" in str(excinfo.value)
333 assert excinfo.value.cycle is not None
334 assert a in excinfo.value.cycle
335 assert b in excinfo.value.cycle
337 def test_add_edge_complex_cycle(self):
338 a = Graphable("A")
339 b = Graphable("B")
340 c = Graphable("C")
341 g = Graph()
342 g.add_edge(a, b)
343 g.add_edge(b, c)
344 with raises(GraphCycleError) as excinfo:
345 g.add_edge(c, a)
346 assert "create a cycle" in str(excinfo.value)
347 assert excinfo.value.cycle is not None
348 assert a in excinfo.value.cycle
349 assert b in excinfo.value.cycle
350 assert c in excinfo.value.cycle
352 def test_add_edge_cycle_with_shared_path(self):
353 a, b, c, d, target = (
354 Graphable("A"),
355 Graphable("B"),
356 Graphable("C"),
357 Graphable("D"),
358 Graphable("Target"),
359 )
360 graph_obj = Graph()
361 graph_obj.add_edge(a, b)
362 graph_obj.add_edge(a, c)
363 graph_obj.add_edge(b, d)
364 graph_obj.add_edge(c, d)
365 graph_obj.add_edge(d, target)
367 assert a.find_path(target) is not None
368 with raises(GraphCycleError):
369 graph_obj.add_edge(target, a)
371 def test_add_node_with_existing_cycle(self):
372 a = Graphable("A")
373 b = Graphable("B")
374 a._add_dependent(b)
375 b._add_depends_on(a)
376 b._add_dependent(a)
377 a._add_depends_on(b)
379 g = Graph()
380 with raises(GraphCycleError) as excinfo:
381 g.add_node(a)
382 assert "existing cycle" in str(excinfo.value)
384 def test_init_with_cycle(self):
385 a = Graphable("A")
386 b = Graphable("B")
387 a._add_dependent(b)
388 b._add_depends_on(a)
389 b._add_dependent(a)
390 a._add_depends_on(b)
391 with raises(GraphCycleError):
392 Graph(initial={a, b})
394 def test_check_cycles_manual(self):
395 a, b = Graphable("A"), Graphable("B")
396 g = Graph()
397 g.add_node(a)
398 g.add_node(b)
399 a._add_dependent(b)
400 b._add_depends_on(a)
401 b._add_dependent(a)
402 a._add_depends_on(b)
403 with raises(GraphCycleError):
404 g.check_cycles()
406 def test_consistency_broken_depends_on(self):
407 a, b = Graphable("A"), Graphable("B")
408 a._add_depends_on(b)
409 g = Graph()
410 with raises(GraphConsistencyError):
411 g.add_node(a)
413 def test_consistency_broken_dependents(self):
414 a, b = Graphable("A"), Graphable("B")
415 a._add_dependent(b)
416 g = Graph()
417 with raises(GraphConsistencyError):
418 g.add_node(a)
420 def test_init_with_inconsistency(self):
421 a, b = Graphable("A"), Graphable("B")
422 a._add_depends_on(b)
423 with raises(GraphConsistencyError):
424 Graph(initial={a, b})
426 def test_container_len(self, nodes):
427 a, b, _ = nodes
428 g = Graph()
429 assert len(g) == 0
430 g.add_node(a)
431 assert len(g) == 1
432 g.add_node(b)
433 assert len(g) == 2
435 def test_container_iter(self, nodes):
436 a, b, c = nodes
437 g = Graph()
438 g.add_edge(a, b)
439 g.add_edge(b, c)
440 assert list(g) == [a, b, c]
442 def test_container_contains(self, nodes):
443 a, b, _ = nodes
444 g = Graph()
445 g.add_node(a)
446 assert a in g
447 assert "A" in g
448 assert b not in g
450 def test_container_getitem(self, nodes):
451 a, b, _ = nodes
452 g = Graph()
453 g.add_node(a)
454 assert g["A"] == a
455 with raises(KeyError):
456 _ = g["B"]
458 def test_remove_edge(self, nodes):
459 a, b, _ = nodes
460 g = Graph()
461 g.add_edge(a, b)
462 g.remove_edge(a, b)
463 assert b not in a.dependents
464 assert a not in b.depends_on
466 def test_remove_node(self, nodes):
467 a, b, c = nodes
468 g = Graph()
469 g.add_edge(a, b)
470 g.add_edge(b, c)
471 g.remove_node(b)
472 assert b not in g
473 assert b not in a.dependents
475 def test_ancestors_descendants(self):
476 a, b, c, d = Graphable("A"), Graphable("B"), Graphable("C"), Graphable("D")
477 g = Graph()
478 g.add_edge(a, b)
479 g.add_edge(b, c)
480 g.add_edge(c, d)
481 assert set(g.ancestors(d)) == {a, b, c}
482 assert set(g.descendants(a)) == {b, c, d}
484 def test_ancestors_diamond(self):
485 a, b, c, d = Graphable("A"), Graphable("B"), Graphable("C"), Graphable("D")
486 g = Graph()
487 g.add_edge(a, b)
488 g.add_edge(a, c)
489 g.add_edge(b, d)
490 g.add_edge(c, d)
491 assert set(g.ancestors(d)) == {a, b, c}
492 assert set(g.descendants(a)) == {b, c, d}
494 def test_upstream_downstream_of(self):
495 a, b, c, d = Graphable("A"), Graphable("B"), Graphable("C"), Graphable("D")
496 g = Graph()
497 g.add_edge(a, b)
498 g.add_edge(b, c)
499 g.add_edge(c, d)
501 up = g.upstream_of(c)
502 assert set(up.topological_order()) == {a, b, c}
503 assert d not in up
505 down = g.downstream_of(b)
506 assert set(down.topological_order()) == {b, c, d}
507 assert a not in down
509 def test_cpm_and_longest_path(self):
510 a = Graphable("A")
511 b = Graphable("B")
512 c = Graphable("C")
513 d = Graphable("D")
515 a.duration = 2
516 b.duration = 3
517 c.duration = 1
518 d.duration = 4
520 g = Graph()
521 g.add_edge(a, b)
522 g.add_edge(a, c)
523 g.add_edge(b, d)
524 g.add_edge(c, d)
526 # Paths:
527 # A(2) -> B(3) -> D(4) = 9
528 # A(2) -> C(1) -> D(4) = 7
530 analysis = g.cpm_analysis()
531 assert analysis[a]["ES"] == 0
532 assert analysis[a]["EF"] == 2
533 assert analysis[b]["ES"] == 2
534 assert analysis[b]["EF"] == 5
535 assert analysis[c]["ES"] == 2
536 assert analysis[c]["EF"] == 3
537 assert analysis[d]["ES"] == 5 # max(EF(B), EF(C)) = max(5, 3) = 5
538 assert analysis[d]["EF"] == 9
540 assert analysis[b]["slack"] == 0
541 assert analysis[c]["slack"] == 2
543 cp = g.critical_path()
544 assert a in cp
545 assert b in cp
546 assert d in cp
547 assert c not in cp
549 lp = g.longest_path()
550 assert lp == [a, b, d]
552 def test_all_paths(self):
553 a = Graphable("A")
554 b = Graphable("B")
555 c = Graphable("C")
556 d = Graphable("D")
557 g = Graph()
558 g.add_edge(a, b)
559 g.add_edge(a, c)
560 g.add_edge(b, d)
561 g.add_edge(c, d)
563 paths = g.all_paths(a, d)
564 assert len(paths) == 2
565 assert [a, b, d] in paths
566 assert [a, c, d] in paths
568 def test_suggest_cycle_breaks(self):
569 a = Graphable("A")
570 b = Graphable("B")
571 c = Graphable("C")
573 # Manually create a cycle A -> B -> C -> A
574 # We can't use g.add_edge because it prevents cycles.
575 # We'll use a graph with manual links and g.add_node.
576 a._add_dependent(b)
577 b._add_depends_on(a)
578 b._add_dependent(c)
579 c._add_depends_on(b)
580 c._add_dependent(a)
581 a._add_depends_on(c)
583 g = Graph()
584 # Use a list of nodes to bypass Graph constructor cycle check
585 g._nodes = {a, b, c}
587 breaks = g.suggest_cycle_breaks()
588 assert len(breaks) > 0
589 u, v = breaks[0]
590 # Any edge in the cycle is a valid break
591 assert (u == a and v == b) or (u == b and v == c) or (u == c and v == a)
593 def test_subgraph_between(self):
594 a, b, c, d, e = [Graphable(x) for x in "ABCDE"]
595 g = Graph()
596 g.add_edge(a, b)
597 g.add_edge(b, d)
598 g.add_edge(a, c)
599 g.add_edge(c, d)
600 g.add_edge(d, e)
602 # Paths from A to D are [A,B,D] and [A,C,D]
603 sub = g.subgraph_between(a, d)
604 assert set(sub.topological_order()) == {a, b, c, d}
605 assert e not in sub
607 def test_diff_graph(self):
608 a1, b1 = Graphable("A"), Graphable("B")
609 g1 = Graph()
610 g1.add_edge(a1, b1)
612 a2, c2 = Graphable("A"), Graphable("C")
613 g2 = Graph()
614 g2.add_edge(a2, c2)
616 dg = g1.diff_graph(g2)
617 # dg should have A, B, C
618 nodes = {n.reference: n for n in dg.topological_order()}
619 assert "A" in nodes
620 assert "B" in nodes
621 assert "C" in nodes
623 assert nodes["B"].is_tagged("diff:removed")
624 assert nodes["C"].is_tagged("diff:added")
626 # Check edges
627 # A->B should be removed (red)
628 # A->C should be added (green)
629 assert nodes["A"].edge_attributes(nodes["B"])["color"] == "red"
630 assert nodes["A"].edge_attributes(nodes["C"])["color"] == "green"
632 def test_transitive_closure(self):
633 a, b, c = [Graphable(x) for x in "ABC"]
634 g = Graph()
635 g.add_edge(a, b)
636 g.add_edge(b, c)
638 closure = g.transitive_closure()
639 assert len(closure) == 3
640 # Should have edges A->B, B->C, AND A->C
641 nodes = {n.reference: n for n in closure.topological_order()}
642 assert nodes["B"] in nodes["A"].dependents
643 assert nodes["C"] in nodes["B"].dependents
644 assert nodes["C"] in nodes["A"].dependents
646 def test_transitive_reduction_simple(self):
647 a, b, c = Graphable("A"), Graphable("B"), Graphable("C")
648 g = Graph()
649 g.add_edge(a, b)
650 g.add_edge(b, c)
651 g.add_edge(a, c)
652 reduced = g.transitive_reduction()
653 assert len(reduced["A"].dependents) == 1
655 def test_transitive_reduction_diamond(self):
656 a, b, c, d = Graphable("A"), Graphable("B"), Graphable("C"), Graphable("D")
657 g = Graph()
658 g.add_edge(a, b)
659 g.add_edge(b, d)
660 g.add_edge(a, c)
661 g.add_edge(c, d)
662 g.add_edge(a, d)
663 reduced = g.transitive_reduction()
664 assert len(reduced["A"].dependents) == 2
665 assert reduced["D"] not in reduced["A"].dependents
667 def test_transitive_reduction_preserves_tags(self):
668 a, b = Graphable("A"), Graphable("B")
669 a.add_tag("important")
670 g = Graph()
671 g.add_edge(a, b)
672 reduced = g.transitive_reduction()
673 assert "important" in reduced["A"].tags
674 reduced["A"].add_tag("reduced-only")
675 assert "reduced-only" not in g["A"].tags
677 def test_graph_render_convenience(self):
678 a, b = Graphable("A"), Graphable("B")
679 g = Graph()
680 g.add_edge(a, b)
681 from graphable.views.mermaid import create_topology_mermaid_mmd
683 out = g.render(create_topology_mermaid_mmd)
684 assert "A --> B" in out
686 def test_graph_export_convenience(self, tmp_path):
687 a, b = Graphable("A"), Graphable("B")
688 g = Graph()
689 g.add_edge(a, b)
690 from graphable.views.mermaid import export_topology_mermaid_mmd
692 output_file = tmp_path / "graph.mmd"
693 g.export(export_topology_mermaid_mmd, output=output_file)
694 assert output_file.exists()
696 def test_checksum_deterministic(self):
697 a1, b1 = Graphable("A"), Graphable("B")
698 g1 = Graph()
699 g1.add_edge(a1, b1)
700 b2, a2 = Graphable("B"), Graphable("A")
701 g2 = Graph()
702 g2.add_edge(a2, b2)
703 assert g1.checksum() == g2.checksum()
705 def test_parallelized_topological_order(self):
706 a, b, c, d = Graphable("A"), Graphable("B"), Graphable("C"), Graphable("D")
707 g = Graph()
708 g.add_edge(a, b)
709 g.add_edge(a, c)
710 g.add_edge(b, d)
711 g.add_edge(c, d)
712 layers = g.parallelized_topological_order()
713 assert len(layers) == 3
714 assert layers[1] == {b, c}
716 def test_validate_checksum(self):
717 a = Graphable("A")
718 g = Graph()
719 g.add_node(a)
720 digest = g.checksum()
721 assert g.validate_checksum(digest) is True
723 def test_read_write_auto_detect(self, tmp_path):
724 a, b = Graphable("A"), Graphable("B")
725 g: Graph[Graphable[str]] = Graph()
726 g.add_edge(a, b)
727 json_file = tmp_path / "graph.json"
728 g.write(json_file)
729 g_read = Graph.read(json_file)
730 assert g == g_read
732 def test_standalone_checksum_io(self, tmp_path):
733 a = Graphable("A")
734 g: Graph[Graphable[str]] = Graph()
735 g.add_node(a)
736 sum_file = tmp_path / "graph.blake2b"
737 g.write_checksum(sum_file)
738 digest = Graph.read_checksum(sum_file)
739 assert digest == g.checksum()
741 def test_embedded_checksum_io(self, tmp_path):
742 a, b = Graphable("A"), Graphable("B")
743 g: Graph[Graphable[str]] = Graph()
744 g.add_edge(a, b)
745 yaml_file = tmp_path / "embedded.yaml"
746 g.write(yaml_file, embed_checksum=True)
747 assert "blake2b:" in yaml_file.read_text()
748 g_read = Graph.read(yaml_file)
749 assert g == g_read
751 def test_embedded_checksum_json_wrapping(self, tmp_path):
752 a = Graphable("A")
753 g: Graph[Graphable[str]] = Graph()
754 g.add_node(a)
756 json_file = tmp_path / "embedded.json"
757 g.write(json_file, embed_checksum=True)
759 data = loads(json_file.read_text())
760 assert "checksum" in data
761 assert "graph" in data
762 assert data["graph"]["nodes"][0]["id"] == "A"
764 # Verify it can be read back
765 g_read = Graph.read(json_file)
766 assert g == g_read
768 def test_clone(self, nodes):
769 a, b, _ = nodes
770 g = Graph()
771 g.add_edge(a, b, weight=5)
772 a.add_tag("t1")
774 # Clone without edges
775 c1 = g.clone(include_edges=False)
776 assert len(c1) == 2
777 assert len(c1["A"].dependents) == 0
778 assert "t1" in c1["A"].tags
779 assert c1["A"] is not a # Should be a copy
781 # Clone with edges
782 c2 = g.clone(include_edges=True)
783 assert len(c2) == 2
784 assert len(c2["A"].dependents) == 1
785 assert c2["A"].edge_attributes(c2["B"])["weight"] == 5
787 def test_checksum_includes_metadata(self, nodes):
788 a, b, _ = nodes
789 g = Graph({a, b})
790 g.add_edge(a, b, weight=10)
792 c1 = g.checksum()
794 # Change duration
795 a.duration = 1.0
796 c2 = g.checksum()
797 assert c1 != c2
799 # Change status
800 b.status = "completed"
801 c3 = g.checksum()
802 assert c2 != c3
804 # Change edge attribute
805 a.set_edge_attribute(b, "weight", 20)
806 c4 = g.checksum()
807 assert c3 != c4
809 def test_export_fallback(self, tmp_path):
810 g = Graph([Graphable("A")])
812 # A function that is NOT in the CREATOR_MAP
813 def mock_exporter(graph, path):
814 with open(path, "w") as f:
815 f.write("mock output")
817 output = tmp_path / "mock.txt"
818 # Calling with embed_checksum=True should trigger the fallback warning and export normally
819 g.export(mock_exporter, output, embed_checksum=True)
820 assert output.read_text() == "mock output"
822 def test_bfs_dfs(self):
823 a, b, c, d = [Graphable(x) for x in "ABCD"]
824 g = Graph()
825 g.add_edge(a, b)
826 g.add_edge(a, c)
827 g.add_edge(b, d)
828 g.add_edge(c, d)
830 # BFS from A: A, then [B, C], then D
831 bfs_nodes = list(g.bfs(a))
832 assert bfs_nodes[0] == a
833 assert set(bfs_nodes[1:3]) == {b, c}
834 assert bfs_nodes[3] == d
836 # DFS from A: A, then B... or C...
837 dfs_nodes = list(g.dfs(a))
838 assert len(dfs_nodes) == 4
839 assert dfs_nodes[0] == a
841 def test_write_unsupported_extension(self):
842 g = Graph()
843 with raises(ValueError, match="Unsupported extension: .invalid"):
844 g.write("test.invalid")
846 @patch("graphable.views.utils.get_image_exporter")
847 def test_write_image_logic(self, mock_get_exporter):
848 mock_exporter = MagicMock()
849 mock_get_exporter.return_value = mock_exporter
850 g = Graph()
851 g.write("test.png", engine="mermaid")
852 mock_get_exporter.assert_called_with("mermaid")
853 mock_exporter.assert_called_once()
855 def test_diff_graph_complex(self):
856 # Coverage for modified/added edges in diff_graph
857 g1 = Graph()
858 a1 = Graphable("A")
859 b1 = Graphable("B")
860 g1.add_edge(a1, b1, weight=1)
862 g2 = Graph()
863 a2 = Graphable("A")
864 b2 = Graphable("B")
865 c2 = Graphable("C")
866 g2.add_edge(a2, b2, weight=2) # Modified edge
867 g2.add_edge(b2, c2) # Added node and edge
869 dg = g1.diff_graph(g2)
870 assert len(dg) == 3
871 # Check for tags/attributes if possible, or just confirm it runs
872 # The logic for color:orange and color:green should be hit
874 def test_parallelized_topological_order_filtered(self):
875 g = Graph()
876 a = Graphable("A")
877 b = Graphable("B")
878 g.add_edge(a, b)
880 # Filter to only include A
881 order = g.parallelized_topological_order_filtered(lambda n: n.reference == "A")
882 assert len(order) == 1
883 assert list(order[0])[0].reference == "A"
885 def test_parallelized_topological_order_tagged(self):
886 g = Graph()
887 a = Graphable("A")
888 a.add_tag("v1")
889 b = Graphable("B")
890 g.add_node(a)
891 g.add_node(b)
893 order = g.parallelized_topological_order_tagged("v1")
894 assert len(order) == 1
895 assert list(order[0])[0].reference == "A"