|
1 | | -from __future__ import annotations |
2 | | - |
3 | | -import re |
4 | | -import logging |
5 | | -from dataclasses import dataclass |
6 | | -from typing import Dict, Optional, Set |
7 | | - |
8 | | -from grimp import exceptions |
9 | | -from grimp.application.ports.modulefinder import FoundPackage |
10 | | -from grimp.application.ports.filesystem import BasicFileSystem |
11 | | -from grimp.domain.valueobjects import DirectImport, Module |
12 | 1 | from grimp import _rustgrimp as rust # type: ignore[attr-defined] |
13 | 2 |
|
14 | | - |
15 | | -logger = logging.getLogger(__name__) |
16 | | - |
17 | | -_LEADING_DOT_REGEX = re.compile(r"^(\.+)\w") |
18 | | - |
19 | | - |
20 | | -@dataclass(frozen=True) |
21 | | -class _ImportedObject: |
22 | | - name: str |
23 | | - line_number: int |
24 | | - line_contents: str |
25 | | - typechecking_only: bool |
26 | | - |
27 | | - |
28 | | -class ImportScanner: |
29 | | - def __init__( |
30 | | - self, |
31 | | - file_system: BasicFileSystem, |
32 | | - found_packages: Set[FoundPackage], |
33 | | - include_external_packages: bool = False, |
34 | | - ) -> None: |
35 | | - """ |
36 | | - Args: |
37 | | - - found_packages: Set of FoundPackages containing all the modules |
38 | | - for analysis. |
39 | | - - file_system: The file system interface to use. |
40 | | - - include_external_packages: Whether to include imports of external modules (i.e. |
41 | | - modules not contained in modules_by_package_directory) |
42 | | - in the results. |
43 | | - """ |
44 | | - self.file_system = file_system |
45 | | - self.include_external_packages = include_external_packages |
46 | | - self.found_packages = found_packages |
47 | | - |
48 | | - # Flatten all the modules into a set. |
49 | | - self.modules: Set[Module] = set() |
50 | | - for package in self.found_packages: |
51 | | - self.modules |= {mf.module for mf in package.module_files} |
52 | | - |
53 | | - self._found_packages_by_module: Dict[Module, FoundPackage] = { |
54 | | - module_file.module: package |
55 | | - for package in self.found_packages |
56 | | - for module_file in package.module_files |
57 | | - } |
58 | | - |
59 | | - def scan_for_imports( |
60 | | - self, module: Module, *, exclude_type_checking_imports: bool = False |
61 | | - ) -> Set[DirectImport]: |
62 | | - """ |
63 | | - Note: this method only analyses the module in question and will not load any other |
64 | | - code, so it relies on self.modules to deduce which modules it imports. (This is |
65 | | - because you can't know whether "from foo.bar import baz" is importing a module |
66 | | - called `baz`, or a function `baz` from the module `bar`.) |
67 | | - """ |
68 | | - found_package = self._found_package_for_module(module) |
69 | | - module_filename = self._determine_module_filename(module, found_package) |
70 | | - module_contents = self._read_module_contents(module_filename) |
71 | | - |
72 | | - try: |
73 | | - imported_objects = self._get_raw_imported_objects(module_contents) |
74 | | - except rust.ParseError as e: |
75 | | - raise exceptions.SourceSyntaxError( |
76 | | - filename=module_filename, |
77 | | - lineno=e.line_number, |
78 | | - text=e.text, |
79 | | - ) |
80 | | - |
81 | | - is_package = self._module_is_package(module_filename) |
82 | | - |
83 | | - imports = set() |
84 | | - for imported_object in imported_objects: |
85 | | - # Filter on `exclude_type_checking_imports`. |
86 | | - if exclude_type_checking_imports and imported_object.typechecking_only: |
87 | | - continue |
88 | | - |
89 | | - # Resolve relative imports. |
90 | | - imported_object_name = self._get_absolute_imported_object_name( |
91 | | - module=module, is_package=is_package, imported_object_name=imported_object.name |
92 | | - ) |
93 | | - |
94 | | - # Resolve imported module. |
95 | | - imported_module = self._get_internal_module(imported_object_name, modules=self.modules) |
96 | | - if imported_module is None: |
97 | | - # => External import. |
98 | | - |
99 | | - # Filter on `self.include_external_packages`. |
100 | | - if not self.include_external_packages: |
101 | | - continue |
102 | | - |
103 | | - # Distill module. |
104 | | - imported_module = self._distill_external_module( |
105 | | - Module(imported_object_name), found_packages=self.found_packages |
106 | | - ) |
107 | | - if imported_module is None: |
108 | | - continue |
109 | | - |
110 | | - imports.add( |
111 | | - DirectImport( |
112 | | - importer=module, |
113 | | - imported=imported_module, |
114 | | - line_number=imported_object.line_number, |
115 | | - line_contents=imported_object.line_contents, |
116 | | - ) |
117 | | - ) |
118 | | - return imports |
119 | | - |
120 | | - def _found_package_for_module(self, module: Module) -> FoundPackage: |
121 | | - try: |
122 | | - return self._found_packages_by_module[module] |
123 | | - except KeyError: |
124 | | - raise ValueError(f"No found package for module {module}.") |
125 | | - |
126 | | - def _determine_module_filename(self, module: Module, found_package: FoundPackage) -> str: |
127 | | - """ |
128 | | - Work out the full filename of the given module. |
129 | | -
|
130 | | - Any given module can either be a straight Python file (foo.py) or else a package |
131 | | - (in which case the file is an __init__.py within a directory). |
132 | | - """ |
133 | | - top_level_components = found_package.name.split(".") |
134 | | - module_components = module.name.split(".") |
135 | | - leaf_components = module_components[len(top_level_components) :] |
136 | | - package_directory = found_package.directory |
137 | | - |
138 | | - filename_root = self.file_system.join(package_directory, *leaf_components) |
139 | | - candidate_filenames = ( |
140 | | - f"{filename_root}.py", |
141 | | - self.file_system.join(filename_root, "__init__.py"), |
142 | | - ) |
143 | | - for candidate_filename in candidate_filenames: |
144 | | - if self.file_system.exists(candidate_filename): |
145 | | - return candidate_filename |
146 | | - raise FileNotFoundError(f"Could not find module {module}.") |
147 | | - |
148 | | - def _read_module_contents(self, module_filename: str) -> str: |
149 | | - """ |
150 | | - Read the file contents of the module. |
151 | | - """ |
152 | | - return self.file_system.read(module_filename) |
153 | | - |
154 | | - def _module_is_package(self, module_filename: str) -> bool: |
155 | | - """ |
156 | | - Whether or not the supplied module filename is a package. |
157 | | - """ |
158 | | - return self.file_system.split(module_filename)[-1] == "__init__.py" |
159 | | - |
160 | | - @staticmethod |
161 | | - def _get_raw_imported_objects(module_contents: str) -> Set[_ImportedObject]: |
162 | | - imported_object_dicts = rust.parse_imported_objects_from_code(module_contents) |
163 | | - return {_ImportedObject(**d) for d in imported_object_dicts} |
164 | | - |
165 | | - @staticmethod |
166 | | - def _get_absolute_imported_object_name( |
167 | | - *, module: Module, is_package: bool, imported_object_name: str |
168 | | - ) -> str: |
169 | | - leading_dot_match = _LEADING_DOT_REGEX.match(imported_object_name) |
170 | | - if leading_dot_match is None: |
171 | | - return imported_object_name |
172 | | - |
173 | | - n_leading_dots = len(leading_dot_match.group(1)) |
174 | | - if is_package: |
175 | | - if n_leading_dots == 1: |
176 | | - imported_object_name_base = module.name |
177 | | - else: |
178 | | - imported_object_name_base = ".".join( |
179 | | - module.name.split(".")[: -(n_leading_dots - 1)] |
180 | | - ) |
181 | | - else: |
182 | | - imported_object_name_base = ".".join(module.name.split(".")[:-n_leading_dots]) |
183 | | - return imported_object_name_base + "." + imported_object_name[n_leading_dots:] |
184 | | - |
185 | | - @staticmethod |
186 | | - def _get_internal_module(object_name: str, *, modules: Set[Module]) -> Optional[Module]: |
187 | | - candidate_module = Module(object_name) |
188 | | - if candidate_module in modules: |
189 | | - return candidate_module |
190 | | - |
191 | | - try: |
192 | | - candidate_module = candidate_module.parent |
193 | | - except ValueError: |
194 | | - return None |
195 | | - else: |
196 | | - if candidate_module in modules: |
197 | | - return candidate_module |
198 | | - else: |
199 | | - return None |
200 | | - |
201 | | - @staticmethod |
202 | | - def _distill_external_module( |
203 | | - module: Module, *, found_packages: Set[FoundPackage] |
204 | | - ) -> Optional[Module]: |
205 | | - """ |
206 | | - Given a module that we already know is external, turn it into a module to add to the graph. |
207 | | -
|
208 | | - The 'distillation' process involves removing any unwanted subpackages. For example, |
209 | | - Module("django.models.db") should be turned into simply Module("django"). |
210 | | -
|
211 | | - The process is more complex for potential namespace packages, as it's not possible to |
212 | | - determine the portion package simply from name. Rather than adding the overhead of a |
213 | | - filesystem read, we just get the shallowest component that does not clash with an internal |
214 | | - module namespace. Take, for example, a Module("foo.blue.alpha.one"). If one of the found |
215 | | - packages is foo.blue.beta, the module will be distilled to Module("foo.blue.alpha"). |
216 | | - Alternatively, if the found package is foo.green, the distilled module will |
217 | | - be Module("foo.blue"). |
218 | | - """ |
219 | | - # If it's a module that is a parent of one of the internal packages, return None |
220 | | - # as it doesn't make sense and is probably an import of a namespace package. |
221 | | - if any(Module(package.name).is_descendant_of(module) for package in found_packages): |
222 | | - return None |
223 | | - |
224 | | - # If it shares a namespace with an internal module, get the shallowest component that does |
225 | | - # not clash with an internal module namespace. |
226 | | - candidate_portions: Set[Module] = set() |
227 | | - for found_package in sorted(found_packages, key=lambda p: p.name, reverse=True): |
228 | | - root_module = Module(found_package.name) |
229 | | - if root_module.is_descendant_of(module.root): |
230 | | - ( |
231 | | - internal_path_components, |
232 | | - external_path_components, |
233 | | - ) = root_module.name.split( |
234 | | - "." |
235 | | - ), module.name.split(".") |
236 | | - external_namespace_components = [] |
237 | | - while external_path_components[0] == internal_path_components[0]: |
238 | | - external_namespace_components.append(external_path_components[0]) |
239 | | - external_path_components = external_path_components[1:] |
240 | | - internal_path_components = internal_path_components[1:] |
241 | | - external_namespace_components.append(external_path_components[0]) |
242 | | - candidate_portions.add(Module(".".join(external_namespace_components))) |
243 | | - |
244 | | - if candidate_portions: |
245 | | - # If multiple found packages share a namespace with this module, use the deepest one |
246 | | - # as we know that that will be a namespace too. |
247 | | - deepest_candidate_portion = sorted( |
248 | | - candidate_portions, key=lambda p: len(p.name.split(".")) |
249 | | - )[-1] |
250 | | - return deepest_candidate_portion |
251 | | - else: |
252 | | - return module.root |
| 3 | +ImportScanner = rust.ImportScanner |
0 commit comments