Coverage for tests / unit / test_graph.py: 100%

680 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-16 21:32 +0000

1from json import loads 

2from unittest.mock import MagicMock, patch 

3 

4from pytest import fixture, raises 

5 

6from graphable.graph import Graph, GraphConsistencyError, GraphCycleError 

7from graphable.graphable import Graphable 

8 

9 

10class TestGraph: 

11 @fixture 

12 def nodes(self): 

13 a = Graphable("A") 

14 b = Graphable("B") 

15 c = Graphable("C") 

16 return a, b, c 

17 

18 def test_initialization(self): 

19 g = Graph() 

20 assert len(g.sources) == 0 

21 

22 def test_add_node(self): 

23 g = Graph() 

24 node = Graphable("A") 

25 assert g.add_node(node) is True 

26 assert g.add_node(node) is False # Already added 

27 assert node in g._nodes 

28 

29 def test_add_edge(self, nodes): 

30 a, b, _ = nodes 

31 g = Graph() 

32 g.add_edge(a, b) 

33 

34 assert a in g._nodes 

35 assert b in g._nodes 

36 assert b in a.dependents 

37 assert a in b.depends_on 

38 

39 def test_sinks_and_sources(self, nodes): 

40 a, b, c = nodes 

41 g = Graph() 

42 g.add_edge(a, b) 

43 g.add_edge(b, c) 

44 

45 assert [a] == g.sources 

46 assert [c] == g.sinks 

47 

48 def test_topological_order(self, nodes): 

49 a, b, c = nodes 

50 g = Graph() 

51 g.add_edge(a, b) 

52 g.add_edge(b, c) 

53 

54 topo = g.topological_order() 

55 assert topo.index(a) < topo.index(b) 

56 assert topo.index(b) < topo.index(c) 

57 

58 def test_subgraph_filtered(self, nodes): 

59 a, b, c = nodes 

60 a.add_tag("keep") 

61 c.add_tag("keep") 

62 

63 g = Graph() 

64 g.add_edge(a, b) 

65 g.add_edge(b, c) 

66 

67 sub = g.subgraph_filtered(lambda n: n.is_tagged("keep")) 

68 nodes_in_sub = sub.topological_order() 

69 assert a in nodes_in_sub 

70 assert c in nodes_in_sub 

71 assert b in nodes_in_sub 

72 

73 def test_subgraph_tagged(self, nodes): 

74 a, b, c = nodes 

75 a.add_tag("t") 

76 g = Graph() 

77 g.add_edge(a, b) 

78 

79 sub = g.subgraph_tagged("t") 

80 assert b in sub.topological_order() 

81 

82 def test_topological_order_filtered(self, nodes): 

83 a, b, c = nodes 

84 g = Graph() 

85 g.add_edge(a, b) 

86 g.add_edge(b, c) 

87 

88 filtered = g.topological_order_filtered(lambda n: n.reference == "B") 

89 assert len(filtered) == 1 

90 assert filtered[0] == b 

91 

92 def test_topological_order_tagged(self, nodes): 

93 a, b, c = nodes 

94 b.add_tag("target") 

95 g = Graph() 

96 g.add_edge(a, b) 

97 g.add_edge(b, c) 

98 

99 tagged = g.topological_order_tagged("target") 

100 assert len(tagged) == 1 

101 assert tagged[0] == b 

102 

103 def test_graph_factory(self, nodes): 

104 a, b, c = nodes 

105 a._add_dependent(b) 

106 b._add_depends_on(a) 

107 b._add_dependent(c) 

108 c._add_depends_on(b) 

109 

110 g = Graph([b], discover=True) 

111 topo = g.topological_order() 

112 assert len(topo) == 3 

113 assert a in topo 

114 assert c in topo 

115 

116 def test_topological_order_caching(self, nodes): 

117 a, b, _ = nodes 

118 g = Graph({a, b}) 

119 

120 # Initial call calculates and caches 

121 topo1 = g.topological_order() 

122 assert g._topological_order is not None 

123 

124 # Subsequent call returns cached value 

125 topo2 = g.topological_order() 

126 assert topo1 is topo2 

127 

128 # Adding a node invalidates cache 

129 c = Graphable("C") 

130 g.add_node(c) 

131 assert g._topological_order is None 

132 

133 # Recalculate 

134 topo3 = g.topological_order() 

135 assert g._topological_order is not None 

136 assert c in topo3 

137 

138 def test_checksum_caching(self, nodes): 

139 a, _, _ = nodes 

140 g = Graph({a}) 

141 

142 # Initial call calculates and caches 

143 c1 = g.checksum() 

144 assert g._checksum is not None 

145 

146 # Subsequent call returns cached value 

147 c2 = g.checksum() 

148 assert c1 == c2 

149 assert g._checksum == c1 

150 

151 # Adding a tag to a node should invalidate the graph's checksum cache 

152 a.add_tag("new-tag") 

153 assert g._checksum is None 

154 

155 # Recalculate 

156 c3 = g.checksum() 

157 assert c3 != c1 

158 assert g._checksum == c3 

159 

160 def test_parallelized_topological_order_caching(self, nodes): 

161 a, b, _ = nodes 

162 a.add_dependent(b) 

163 g = Graph({a, b}) 

164 

165 # Initial call calculates and caches 

166 order1 = g.parallelized_topological_order() 

167 assert g._parallel_topological_order is not None 

168 assert order1 is g._parallel_topological_order 

169 

170 # Subsequent call uses cached value 

171 order2 = g.parallelized_topological_order() 

172 assert order2 is order1 

173 

174 # Adding an edge should invalidate cache 

175 c = Graphable("C") 

176 g.add_node(c) 

177 g.add_edge(b, c) 

178 assert g._parallel_topological_order is None 

179 

180 # Recalculate 

181 order3 = g.parallelized_topological_order() 

182 assert g._parallel_topological_order is not None 

183 assert len(order3) == 3 # A, B, C in separate layers 

184 

185 def test_node_tag_invalidates_all_caches(self, nodes): 

186 a, _, _ = nodes 

187 g = Graph({a}) 

188 

189 g.topological_order() 

190 g.parallelized_topological_order() 

191 g.checksum() 

192 

193 assert g._topological_order is not None 

194 assert g._parallel_topological_order is not None 

195 assert g._checksum is not None 

196 

197 # Modify node 

198 a.add_tag("important") 

199 

200 assert g._topological_order is None 

201 assert g._parallel_topological_order is None 

202 assert g._checksum is None 

203 

204 def test_node_dependency_change_invalidates_all_caches(self, nodes): 

205 a, b, _ = nodes 

206 g = Graph({a, b}) 

207 

208 g.topological_order() 

209 g.parallelized_topological_order() 

210 g.checksum() 

211 

212 assert g._topological_order is not None 

213 assert g._parallel_topological_order is not None 

214 assert g._checksum is not None 

215 

216 # Modify node dependency externally (not via graph.add_edge) 

217 a.add_dependent(b) 

218 

219 assert g._topological_order is None 

220 assert g._parallel_topological_order is None 

221 assert g._checksum is None 

222 

223 def test_node_edge_removal_invalidates_graph_cache(self, nodes): 

224 a, b, _ = nodes 

225 g = Graph() 

226 g.add_edge(a, b) 

227 

228 g.checksum() 

229 assert g._checksum is not None 

230 

231 # Remove edge via node method directly 

232 a._remove_dependent(b) 

233 b._remove_depends_on(a) 

234 

235 assert g._checksum is None 

236 

237 def test_multiple_graphs_observing_same_node(self, nodes): 

238 a, _, _ = nodes 

239 g1 = Graph({a}) 

240 g2 = Graph({a}) 

241 

242 g1.checksum() 

243 g2.checksum() 

244 

245 assert g1._checksum is not None 

246 assert g2._checksum is not None 

247 

248 a.add_tag("shared") 

249 

250 assert g1._checksum is None 

251 assert g2._checksum is None 

252 

253 def test_subgraph_filtering_in_topological_order(self, nodes): 

254 a, b, _ = nodes 

255 a.add_dependent(b) 

256 

257 # Graph only contains A 

258 g = Graph({a}) 

259 

260 # B should not be in the results even though it's a dependent 

261 order = g.topological_order() 

262 assert order == [a] 

263 assert b not in order 

264 

265 parallel = g.parallelized_topological_order() 

266 assert parallel == [{a}] 

267 

268 def test_external_node_change_does_not_affect_checksum(self, nodes): 

269 a, b, _ = nodes 

270 a.add_dependent(b) 

271 

272 g = Graph({a}) 

273 c1 = g.checksum() 

274 

275 # Modifying B (outside G) should NOT change G's checksum 

276 # because we only include internal edges now. 

277 b.add_tag("external-change") 

278 c2 = g.checksum() 

279 

280 assert c1 == c2 

281 

282 def test_internal_node_change_invalidates_cache(self, nodes): 

283 a, b, _ = nodes 

284 g = Graph({a, b}) 

285 a.add_dependent(b) 

286 

287 c1 = g.checksum() 

288 assert g._checksum is not None 

289 

290 # Modifying internal node B should invalidate G's cache 

291 b.add_tag("internal-change") 

292 assert g._checksum is None 

293 assert g.checksum() != c1 

294 

295 def test_discover_pulls_in_external_nodes(self, nodes): 

296 a, b, c = nodes 

297 a.add_dependent(b) 

298 b.add_dependent(c) 

299 

300 # Graph only starts with A 

301 g = Graph({a}) 

302 assert len(g) == 1 

303 assert b not in g 

304 

305 # Discover should pull in B and C 

306 g.discover() 

307 assert len(g) == 3 

308 assert b in g 

309 assert c in g 

310 

311 # Now G is an observer for B and C 

312 g.checksum() 

313 assert g._checksum is not None 

314 c.add_tag("new-info") 

315 assert g._checksum is None 

316 

317 def test_add_edge_self_loop(self): 

318 a = Graphable("A") 

319 g = Graph() 

320 with raises(GraphCycleError) as excinfo: 

321 g.add_edge(a, a) 

322 assert "Self-loop" in str(excinfo.value) 

323 assert excinfo.value.cycle == [a, a] 

324 

325 def test_add_edge_simple_cycle(self): 

326 a = Graphable("A") 

327 b = Graphable("B") 

328 g = Graph() 

329 g.add_edge(a, b) 

330 with raises(GraphCycleError) as excinfo: 

331 g.add_edge(b, a) 

332 assert "create a cycle" in str(excinfo.value) 

333 assert excinfo.value.cycle is not None 

334 assert a in excinfo.value.cycle 

335 assert b in excinfo.value.cycle 

336 

337 def test_add_edge_complex_cycle(self): 

338 a = Graphable("A") 

339 b = Graphable("B") 

340 c = Graphable("C") 

341 g = Graph() 

342 g.add_edge(a, b) 

343 g.add_edge(b, c) 

344 with raises(GraphCycleError) as excinfo: 

345 g.add_edge(c, a) 

346 assert "create a cycle" in str(excinfo.value) 

347 assert excinfo.value.cycle is not None 

348 assert a in excinfo.value.cycle 

349 assert b in excinfo.value.cycle 

350 assert c in excinfo.value.cycle 

351 

352 def test_add_edge_cycle_with_shared_path(self): 

353 a, b, c, d, target = ( 

354 Graphable("A"), 

355 Graphable("B"), 

356 Graphable("C"), 

357 Graphable("D"), 

358 Graphable("Target"), 

359 ) 

360 graph_obj = Graph() 

361 graph_obj.add_edge(a, b) 

362 graph_obj.add_edge(a, c) 

363 graph_obj.add_edge(b, d) 

364 graph_obj.add_edge(c, d) 

365 graph_obj.add_edge(d, target) 

366 

367 assert a.find_path(target) is not None 

368 with raises(GraphCycleError): 

369 graph_obj.add_edge(target, a) 

370 

371 def test_add_node_with_existing_cycle(self): 

372 a = Graphable("A") 

373 b = Graphable("B") 

374 a._add_dependent(b) 

375 b._add_depends_on(a) 

376 b._add_dependent(a) 

377 a._add_depends_on(b) 

378 

379 g = Graph() 

380 with raises(GraphCycleError) as excinfo: 

381 g.add_node(a) 

382 assert "existing cycle" in str(excinfo.value) 

383 

384 def test_init_with_cycle(self): 

385 a = Graphable("A") 

386 b = Graphable("B") 

387 a._add_dependent(b) 

388 b._add_depends_on(a) 

389 b._add_dependent(a) 

390 a._add_depends_on(b) 

391 with raises(GraphCycleError): 

392 Graph(initial={a, b}) 

393 

394 def test_check_cycles_manual(self): 

395 a, b = Graphable("A"), Graphable("B") 

396 g = Graph() 

397 g.add_node(a) 

398 g.add_node(b) 

399 a._add_dependent(b) 

400 b._add_depends_on(a) 

401 b._add_dependent(a) 

402 a._add_depends_on(b) 

403 with raises(GraphCycleError): 

404 g.check_cycles() 

405 

406 def test_consistency_broken_depends_on(self): 

407 a, b = Graphable("A"), Graphable("B") 

408 a._add_depends_on(b) 

409 g = Graph() 

410 with raises(GraphConsistencyError): 

411 g.add_node(a) 

412 

413 def test_consistency_broken_dependents(self): 

414 a, b = Graphable("A"), Graphable("B") 

415 a._add_dependent(b) 

416 g = Graph() 

417 with raises(GraphConsistencyError): 

418 g.add_node(a) 

419 

420 def test_init_with_inconsistency(self): 

421 a, b = Graphable("A"), Graphable("B") 

422 a._add_depends_on(b) 

423 with raises(GraphConsistencyError): 

424 Graph(initial={a, b}) 

425 

426 def test_container_len(self, nodes): 

427 a, b, _ = nodes 

428 g = Graph() 

429 assert len(g) == 0 

430 g.add_node(a) 

431 assert len(g) == 1 

432 g.add_node(b) 

433 assert len(g) == 2 

434 

435 def test_container_iter(self, nodes): 

436 a, b, c = nodes 

437 g = Graph() 

438 g.add_edge(a, b) 

439 g.add_edge(b, c) 

440 assert list(g) == [a, b, c] 

441 

442 def test_container_contains(self, nodes): 

443 a, b, _ = nodes 

444 g = Graph() 

445 g.add_node(a) 

446 assert a in g 

447 assert "A" in g 

448 assert b not in g 

449 

450 def test_container_getitem(self, nodes): 

451 a, b, _ = nodes 

452 g = Graph() 

453 g.add_node(a) 

454 assert g["A"] == a 

455 with raises(KeyError): 

456 _ = g["B"] 

457 

458 def test_remove_edge(self, nodes): 

459 a, b, _ = nodes 

460 g = Graph() 

461 g.add_edge(a, b) 

462 g.remove_edge(a, b) 

463 assert b not in a.dependents 

464 assert a not in b.depends_on 

465 

466 def test_remove_node(self, nodes): 

467 a, b, c = nodes 

468 g = Graph() 

469 g.add_edge(a, b) 

470 g.add_edge(b, c) 

471 g.remove_node(b) 

472 assert b not in g 

473 assert b not in a.dependents 

474 

475 def test_ancestors_descendants(self): 

476 a, b, c, d = Graphable("A"), Graphable("B"), Graphable("C"), Graphable("D") 

477 g = Graph() 

478 g.add_edge(a, b) 

479 g.add_edge(b, c) 

480 g.add_edge(c, d) 

481 assert set(g.ancestors(d)) == {a, b, c} 

482 assert set(g.descendants(a)) == {b, c, d} 

483 

484 def test_ancestors_diamond(self): 

485 a, b, c, d = Graphable("A"), Graphable("B"), Graphable("C"), Graphable("D") 

486 g = Graph() 

487 g.add_edge(a, b) 

488 g.add_edge(a, c) 

489 g.add_edge(b, d) 

490 g.add_edge(c, d) 

491 assert set(g.ancestors(d)) == {a, b, c} 

492 assert set(g.descendants(a)) == {b, c, d} 

493 

494 def test_upstream_downstream_of(self): 

495 a, b, c, d = Graphable("A"), Graphable("B"), Graphable("C"), Graphable("D") 

496 g = Graph() 

497 g.add_edge(a, b) 

498 g.add_edge(b, c) 

499 g.add_edge(c, d) 

500 

501 up = g.upstream_of(c) 

502 assert set(up.topological_order()) == {a, b, c} 

503 assert d not in up 

504 

505 down = g.downstream_of(b) 

506 assert set(down.topological_order()) == {b, c, d} 

507 assert a not in down 

508 

509 def test_cpm_and_longest_path(self): 

510 a = Graphable("A") 

511 b = Graphable("B") 

512 c = Graphable("C") 

513 d = Graphable("D") 

514 

515 a.duration = 2 

516 b.duration = 3 

517 c.duration = 1 

518 d.duration = 4 

519 

520 g = Graph() 

521 g.add_edge(a, b) 

522 g.add_edge(a, c) 

523 g.add_edge(b, d) 

524 g.add_edge(c, d) 

525 

526 # Paths: 

527 # A(2) -> B(3) -> D(4) = 9 

528 # A(2) -> C(1) -> D(4) = 7 

529 

530 analysis = g.cpm_analysis() 

531 assert analysis[a]["ES"] == 0 

532 assert analysis[a]["EF"] == 2 

533 assert analysis[b]["ES"] == 2 

534 assert analysis[b]["EF"] == 5 

535 assert analysis[c]["ES"] == 2 

536 assert analysis[c]["EF"] == 3 

537 assert analysis[d]["ES"] == 5 # max(EF(B), EF(C)) = max(5, 3) = 5 

538 assert analysis[d]["EF"] == 9 

539 

540 assert analysis[b]["slack"] == 0 

541 assert analysis[c]["slack"] == 2 

542 

543 cp = g.critical_path() 

544 assert a in cp 

545 assert b in cp 

546 assert d in cp 

547 assert c not in cp 

548 

549 lp = g.longest_path() 

550 assert lp == [a, b, d] 

551 

552 def test_all_paths(self): 

553 a = Graphable("A") 

554 b = Graphable("B") 

555 c = Graphable("C") 

556 d = Graphable("D") 

557 g = Graph() 

558 g.add_edge(a, b) 

559 g.add_edge(a, c) 

560 g.add_edge(b, d) 

561 g.add_edge(c, d) 

562 

563 paths = g.all_paths(a, d) 

564 assert len(paths) == 2 

565 assert [a, b, d] in paths 

566 assert [a, c, d] in paths 

567 

568 def test_suggest_cycle_breaks(self): 

569 a = Graphable("A") 

570 b = Graphable("B") 

571 c = Graphable("C") 

572 

573 # Manually create a cycle A -> B -> C -> A 

574 # We can't use g.add_edge because it prevents cycles. 

575 # We'll use a graph with manual links and g.add_node. 

576 a._add_dependent(b) 

577 b._add_depends_on(a) 

578 b._add_dependent(c) 

579 c._add_depends_on(b) 

580 c._add_dependent(a) 

581 a._add_depends_on(c) 

582 

583 g = Graph() 

584 # Use a list of nodes to bypass Graph constructor cycle check 

585 g._nodes = {a, b, c} 

586 

587 breaks = g.suggest_cycle_breaks() 

588 assert len(breaks) > 0 

589 u, v = breaks[0] 

590 # Any edge in the cycle is a valid break 

591 assert (u == a and v == b) or (u == b and v == c) or (u == c and v == a) 

592 

593 def test_subgraph_between(self): 

594 a, b, c, d, e = [Graphable(x) for x in "ABCDE"] 

595 g = Graph() 

596 g.add_edge(a, b) 

597 g.add_edge(b, d) 

598 g.add_edge(a, c) 

599 g.add_edge(c, d) 

600 g.add_edge(d, e) 

601 

602 # Paths from A to D are [A,B,D] and [A,C,D] 

603 sub = g.subgraph_between(a, d) 

604 assert set(sub.topological_order()) == {a, b, c, d} 

605 assert e not in sub 

606 

607 def test_diff_graph(self): 

608 a1, b1 = Graphable("A"), Graphable("B") 

609 g1 = Graph() 

610 g1.add_edge(a1, b1) 

611 

612 a2, c2 = Graphable("A"), Graphable("C") 

613 g2 = Graph() 

614 g2.add_edge(a2, c2) 

615 

616 dg = g1.diff_graph(g2) 

617 # dg should have A, B, C 

618 nodes = {n.reference: n for n in dg.topological_order()} 

619 assert "A" in nodes 

620 assert "B" in nodes 

621 assert "C" in nodes 

622 

623 assert nodes["B"].is_tagged("diff:removed") 

624 assert nodes["C"].is_tagged("diff:added") 

625 

626 # Check edges 

627 # A->B should be removed (red) 

628 # A->C should be added (green) 

629 assert nodes["A"].edge_attributes(nodes["B"])["color"] == "red" 

630 assert nodes["A"].edge_attributes(nodes["C"])["color"] == "green" 

631 

632 def test_transitive_closure(self): 

633 a, b, c = [Graphable(x) for x in "ABC"] 

634 g = Graph() 

635 g.add_edge(a, b) 

636 g.add_edge(b, c) 

637 

638 closure = g.transitive_closure() 

639 assert len(closure) == 3 

640 # Should have edges A->B, B->C, AND A->C 

641 nodes = {n.reference: n for n in closure.topological_order()} 

642 assert nodes["B"] in nodes["A"].dependents 

643 assert nodes["C"] in nodes["B"].dependents 

644 assert nodes["C"] in nodes["A"].dependents 

645 

646 def test_transitive_reduction_simple(self): 

647 a, b, c = Graphable("A"), Graphable("B"), Graphable("C") 

648 g = Graph() 

649 g.add_edge(a, b) 

650 g.add_edge(b, c) 

651 g.add_edge(a, c) 

652 reduced = g.transitive_reduction() 

653 assert len(reduced["A"].dependents) == 1 

654 

655 def test_transitive_reduction_diamond(self): 

656 a, b, c, d = Graphable("A"), Graphable("B"), Graphable("C"), Graphable("D") 

657 g = Graph() 

658 g.add_edge(a, b) 

659 g.add_edge(b, d) 

660 g.add_edge(a, c) 

661 g.add_edge(c, d) 

662 g.add_edge(a, d) 

663 reduced = g.transitive_reduction() 

664 assert len(reduced["A"].dependents) == 2 

665 assert reduced["D"] not in reduced["A"].dependents 

666 

667 def test_transitive_reduction_preserves_tags(self): 

668 a, b = Graphable("A"), Graphable("B") 

669 a.add_tag("important") 

670 g = Graph() 

671 g.add_edge(a, b) 

672 reduced = g.transitive_reduction() 

673 assert "important" in reduced["A"].tags 

674 reduced["A"].add_tag("reduced-only") 

675 assert "reduced-only" not in g["A"].tags 

676 

677 def test_graph_render_convenience(self): 

678 a, b = Graphable("A"), Graphable("B") 

679 g = Graph() 

680 g.add_edge(a, b) 

681 from graphable.views.mermaid import create_topology_mermaid_mmd 

682 

683 out = g.render(create_topology_mermaid_mmd) 

684 assert "A --> B" in out 

685 

686 def test_graph_export_convenience(self, tmp_path): 

687 a, b = Graphable("A"), Graphable("B") 

688 g = Graph() 

689 g.add_edge(a, b) 

690 from graphable.views.mermaid import export_topology_mermaid_mmd 

691 

692 output_file = tmp_path / "graph.mmd" 

693 g.export(export_topology_mermaid_mmd, output=output_file) 

694 assert output_file.exists() 

695 

696 def test_checksum_deterministic(self): 

697 a1, b1 = Graphable("A"), Graphable("B") 

698 g1 = Graph() 

699 g1.add_edge(a1, b1) 

700 b2, a2 = Graphable("B"), Graphable("A") 

701 g2 = Graph() 

702 g2.add_edge(a2, b2) 

703 assert g1.checksum() == g2.checksum() 

704 

705 def test_parallelized_topological_order(self): 

706 a, b, c, d = Graphable("A"), Graphable("B"), Graphable("C"), Graphable("D") 

707 g = Graph() 

708 g.add_edge(a, b) 

709 g.add_edge(a, c) 

710 g.add_edge(b, d) 

711 g.add_edge(c, d) 

712 layers = g.parallelized_topological_order() 

713 assert len(layers) == 3 

714 assert layers[1] == {b, c} 

715 

716 def test_validate_checksum(self): 

717 a = Graphable("A") 

718 g = Graph() 

719 g.add_node(a) 

720 digest = g.checksum() 

721 assert g.validate_checksum(digest) is True 

722 

723 def test_read_write_auto_detect(self, tmp_path): 

724 a, b = Graphable("A"), Graphable("B") 

725 g: Graph[Graphable[str]] = Graph() 

726 g.add_edge(a, b) 

727 json_file = tmp_path / "graph.json" 

728 g.write(json_file) 

729 g_read = Graph.read(json_file) 

730 assert g == g_read 

731 

732 def test_standalone_checksum_io(self, tmp_path): 

733 a = Graphable("A") 

734 g: Graph[Graphable[str]] = Graph() 

735 g.add_node(a) 

736 sum_file = tmp_path / "graph.blake2b" 

737 g.write_checksum(sum_file) 

738 digest = Graph.read_checksum(sum_file) 

739 assert digest == g.checksum() 

740 

741 def test_embedded_checksum_io(self, tmp_path): 

742 a, b = Graphable("A"), Graphable("B") 

743 g: Graph[Graphable[str]] = Graph() 

744 g.add_edge(a, b) 

745 yaml_file = tmp_path / "embedded.yaml" 

746 g.write(yaml_file, embed_checksum=True) 

747 assert "blake2b:" in yaml_file.read_text() 

748 g_read = Graph.read(yaml_file) 

749 assert g == g_read 

750 

751 def test_embedded_checksum_json_wrapping(self, tmp_path): 

752 a = Graphable("A") 

753 g: Graph[Graphable[str]] = Graph() 

754 g.add_node(a) 

755 

756 json_file = tmp_path / "embedded.json" 

757 g.write(json_file, embed_checksum=True) 

758 

759 data = loads(json_file.read_text()) 

760 assert "checksum" in data 

761 assert "graph" in data 

762 assert data["graph"]["nodes"][0]["id"] == "A" 

763 

764 # Verify it can be read back 

765 g_read = Graph.read(json_file) 

766 assert g == g_read 

767 

768 def test_clone(self, nodes): 

769 a, b, _ = nodes 

770 g = Graph() 

771 g.add_edge(a, b, weight=5) 

772 a.add_tag("t1") 

773 

774 # Clone without edges 

775 c1 = g.clone(include_edges=False) 

776 assert len(c1) == 2 

777 assert len(c1["A"].dependents) == 0 

778 assert "t1" in c1["A"].tags 

779 assert c1["A"] is not a # Should be a copy 

780 

781 # Clone with edges 

782 c2 = g.clone(include_edges=True) 

783 assert len(c2) == 2 

784 assert len(c2["A"].dependents) == 1 

785 assert c2["A"].edge_attributes(c2["B"])["weight"] == 5 

786 

787 def test_checksum_includes_metadata(self, nodes): 

788 a, b, _ = nodes 

789 g = Graph({a, b}) 

790 g.add_edge(a, b, weight=10) 

791 

792 c1 = g.checksum() 

793 

794 # Change duration 

795 a.duration = 1.0 

796 c2 = g.checksum() 

797 assert c1 != c2 

798 

799 # Change status 

800 b.status = "completed" 

801 c3 = g.checksum() 

802 assert c2 != c3 

803 

804 # Change edge attribute 

805 a.set_edge_attribute(b, "weight", 20) 

806 c4 = g.checksum() 

807 assert c3 != c4 

808 

809 def test_export_fallback(self, tmp_path): 

810 g = Graph([Graphable("A")]) 

811 

812 # A function that is NOT in the CREATOR_MAP 

813 def mock_exporter(graph, path): 

814 with open(path, "w") as f: 

815 f.write("mock output") 

816 

817 output = tmp_path / "mock.txt" 

818 # Calling with embed_checksum=True should trigger the fallback warning and export normally 

819 g.export(mock_exporter, output, embed_checksum=True) 

820 assert output.read_text() == "mock output" 

821 

822 def test_bfs_dfs(self): 

823 a, b, c, d = [Graphable(x) for x in "ABCD"] 

824 g = Graph() 

825 g.add_edge(a, b) 

826 g.add_edge(a, c) 

827 g.add_edge(b, d) 

828 g.add_edge(c, d) 

829 

830 # BFS from A: A, then [B, C], then D 

831 bfs_nodes = list(g.bfs(a)) 

832 assert bfs_nodes[0] == a 

833 assert set(bfs_nodes[1:3]) == {b, c} 

834 assert bfs_nodes[3] == d 

835 

836 # DFS from A: A, then B... or C... 

837 dfs_nodes = list(g.dfs(a)) 

838 assert len(dfs_nodes) == 4 

839 assert dfs_nodes[0] == a 

840 

841 def test_write_unsupported_extension(self): 

842 g = Graph() 

843 with raises(ValueError, match="Unsupported extension: .invalid"): 

844 g.write("test.invalid") 

845 

846 @patch("graphable.views.utils.get_image_exporter") 

847 def test_write_image_logic(self, mock_get_exporter): 

848 mock_exporter = MagicMock() 

849 mock_get_exporter.return_value = mock_exporter 

850 g = Graph() 

851 g.write("test.png", engine="mermaid") 

852 mock_get_exporter.assert_called_with("mermaid") 

853 mock_exporter.assert_called_once() 

854 

855 def test_diff_graph_complex(self): 

856 # Coverage for modified/added edges in diff_graph 

857 g1 = Graph() 

858 a1 = Graphable("A") 

859 b1 = Graphable("B") 

860 g1.add_edge(a1, b1, weight=1) 

861 

862 g2 = Graph() 

863 a2 = Graphable("A") 

864 b2 = Graphable("B") 

865 c2 = Graphable("C") 

866 g2.add_edge(a2, b2, weight=2) # Modified edge 

867 g2.add_edge(b2, c2) # Added node and edge 

868 

869 dg = g1.diff_graph(g2) 

870 assert len(dg) == 3 

871 # Check for tags/attributes if possible, or just confirm it runs 

872 # The logic for color:orange and color:green should be hit 

873 

874 def test_parallelized_topological_order_filtered(self): 

875 g = Graph() 

876 a = Graphable("A") 

877 b = Graphable("B") 

878 g.add_edge(a, b) 

879 

880 # Filter to only include A 

881 order = g.parallelized_topological_order_filtered(lambda n: n.reference == "A") 

882 assert len(order) == 1 

883 assert list(order[0])[0].reference == "A" 

884 

885 def test_parallelized_topological_order_tagged(self): 

886 g = Graph() 

887 a = Graphable("A") 

888 a.add_tag("v1") 

889 b = Graphable("B") 

890 g.add_node(a) 

891 g.add_node(b) 

892 

893 order = g.parallelized_topological_order_tagged("v1") 

894 assert len(order) == 1 

895 assert list(order[0])[0].reference == "A"