|
1 | 1 | from __future__ import annotations |
| 2 | +import itertools |
2 | 3 | from typing import List, Optional, Sequence, Set, Tuple, TypedDict |
3 | 4 | from grimp.domain.analysis import PackageDependency, Route |
4 | 5 | from grimp.domain.valueobjects import Layer |
5 | | - |
| 6 | +import igraph as ig # type: ignore |
6 | 7 | from grimp import _rustgrimp as rust # type: ignore[attr-defined] |
7 | 8 | from grimp.exceptions import ( |
8 | 9 | ModuleNotPresent, |
@@ -444,8 +445,52 @@ def find_cycle_breakers(self, package: str) -> list[Import]: |
444 | 445 | """ |
445 | 446 | Identify a set of imports that, if removed, would make the package locally acyclic. |
446 | 447 | """ |
447 | | - # TODO |
448 | | - return [] |
| 448 | + children = self.find_children(package) |
| 449 | + if len(children) < 2: |
| 450 | + return [] |
| 451 | + igraph = ig.Graph(directed=True) |
| 452 | + igraph.add_vertices([package, *children]) |
| 453 | + edges: list[tuple[str, str]] = [] |
| 454 | + weights: list[int] = [] |
| 455 | + for downstream, upstream in itertools.permutations(children): |
| 456 | + total_imports = 0 |
| 457 | + for expression in ( |
| 458 | + f"{downstream} -> {upstream}", |
| 459 | + f"{downstream}.** -> {upstream}", |
| 460 | + f"{downstream} -> {upstream}.**", |
| 461 | + f"{downstream}.** -> {upstream}.**", |
| 462 | + ): |
| 463 | + total_imports += len(self.find_matching_direct_imports(expression)) |
| 464 | + if total_imports: |
| 465 | + edges.append((downstream, upstream)) |
| 466 | + weights.append(total_imports) |
| 467 | + |
| 468 | + igraph.add_edges(edges) |
| 469 | + igraph.es["weight"] = weights |
| 470 | + |
| 471 | + arc_set = igraph.feedback_arc_set(weights="weight") |
| 472 | + |
| 473 | + squashed_imports: list[Import] = [] |
| 474 | + for edge_id in arc_set: |
| 475 | + edge = igraph.es[edge_id] |
| 476 | + squashed_imports.append( |
| 477 | + { |
| 478 | + "importer": edge.source_vertex["name"], |
| 479 | + "imported": edge.target_vertex["name"], |
| 480 | + } |
| 481 | + ) |
| 482 | + |
| 483 | + unsquashed_imports: list[Import] = [] |
| 484 | + for squashed_import in squashed_imports: |
| 485 | + for pattern in ( |
| 486 | + f"{squashed_import['importer']} -> {squashed_import['imported']}", |
| 487 | + f"{squashed_import['importer']}.** -> {squashed_import['imported']}", |
| 488 | + f"{squashed_import['importer']} -> {squashed_import['imported']}.**", |
| 489 | + f"{squashed_import['importer']}.** -> {squashed_import['imported']}.**", |
| 490 | + ): |
| 491 | + unsquashed_imports.extend(self.find_matching_direct_imports(pattern)) |
| 492 | + |
| 493 | + return unsquashed_imports |
449 | 494 |
|
450 | 495 | # Dunder methods |
451 | 496 | # -------------- |
|
0 commit comments