|
1 | | -import math |
2 | | -import os |
3 | | -from typing import Collection, Set, Dict, Iterable |
4 | | - |
5 | | -import joblib # type: ignore |
| 1 | +from typing import Collection, Set, Dict |
6 | 2 |
|
7 | 3 | from grimp import _rustgrimp as rust # type: ignore[attr-defined] |
8 | 4 | from grimp.domain.valueobjects import DirectImport, Module |
|
11 | 7 | from grimp.application.ports.modulefinder import ModuleFile, FoundPackage |
12 | 8 |
|
13 | 9 |
|
14 | | -# Calling code can set this environment variable if it wants to tune when to switch to |
15 | | -# multiprocessing, or set it to a large number to disable it altogether. |
16 | | -MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPROCESSING_ENV_NAME = "GRIMP_MIN_MULTIPROCESSING_MODULES" |
17 | | -# This is an arbitrary number, but setting it too low slows down our functional tests considerably. |
18 | | -# If you change this, update docs/usage.rst too! |
19 | | -DEFAULT_MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPROCESSING = 50 |
20 | | - |
21 | | - |
22 | 10 | def scan_imports( |
23 | 11 | module_files: Collection[ModuleFile], |
24 | 12 | *, |
25 | 13 | found_packages: Set[FoundPackage], |
26 | 14 | include_external_packages: bool, |
27 | 15 | exclude_type_checking_imports: bool, |
28 | | -) -> Dict[ModuleFile, Set[DirectImport]]: |
29 | | - chunks = _create_chunks(module_files) |
30 | | - return _scan_chunks( |
31 | | - chunks, |
32 | | - found_packages, |
33 | | - include_external_packages, |
34 | | - exclude_type_checking_imports, |
35 | | - ) |
36 | | - |
37 | | - |
38 | | -def _create_chunks(module_files: Collection[ModuleFile]) -> tuple[tuple[ModuleFile, ...], ...]: |
39 | | - """ |
40 | | - Split the module files into chunks, each to be worked on by a separate OS process. |
41 | | - """ |
42 | | - module_files_tuple = tuple(module_files) |
43 | | - |
44 | | - number_of_module_files = len(module_files_tuple) |
45 | | - n_chunks = _decide_number_of_processes(number_of_module_files) |
46 | | - chunk_size = math.ceil(number_of_module_files / n_chunks) |
47 | | - |
48 | | - return tuple( |
49 | | - module_files_tuple[i * chunk_size : (i + 1) * chunk_size] for i in range(n_chunks) |
50 | | - ) |
51 | | - |
52 | | - |
53 | | -def _decide_number_of_processes(number_of_module_files: int) -> int: |
54 | | - min_number_of_modules = int( |
55 | | - os.environ.get( |
56 | | - MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPROCESSING_ENV_NAME, |
57 | | - DEFAULT_MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPROCESSING, |
58 | | - ) |
59 | | - ) |
60 | | - if number_of_module_files < min_number_of_modules: |
61 | | - # Don't incur the overhead of multiple processes. |
62 | | - return 1 |
63 | | - return min(joblib.cpu_count(), number_of_module_files) |
64 | | - |
65 | | - |
66 | | -def _scan_chunk( |
67 | | - found_packages: Set[FoundPackage], |
68 | | - include_external_packages: bool, |
69 | | - exclude_type_checking_imports: bool, |
70 | | - chunk: Iterable[ModuleFile], |
71 | 16 | ) -> Dict[ModuleFile, Set[DirectImport]]: |
72 | 17 | file_system: AbstractFileSystem = settings.FILE_SYSTEM |
73 | 18 | basic_file_system = file_system.convert_to_basic() |
74 | 19 | imports_by_module: dict[Module, set[DirectImport]] = rust.scan_for_imports( |
75 | | - module_files=chunk, |
| 20 | + module_files=tuple(module_files), |
76 | 21 | found_packages=found_packages, |
77 | 22 | # Ensure that the passed exclude_type_checking_imports is definitely a boolean, |
78 | 23 | # otherwise the Rust class will error. |
79 | 24 | include_external_packages=bool(include_external_packages), |
80 | 25 | exclude_type_checking_imports=exclude_type_checking_imports, |
81 | 26 | file_system=basic_file_system, |
82 | 27 | ) |
83 | | - return {module_file: imports_by_module[module_file.module] for module_file in chunk} |
84 | | - |
85 | | - |
86 | | -def _scan_chunks( |
87 | | - chunks: Collection[Collection[ModuleFile]], |
88 | | - found_packages: Set[FoundPackage], |
89 | | - include_external_packages: bool, |
90 | | - exclude_type_checking_imports: bool, |
91 | | -) -> Dict[ModuleFile, Set[DirectImport]]: |
92 | | - number_of_processes = len(chunks) |
93 | | - import_scanning_jobs = joblib.Parallel(n_jobs=number_of_processes)( |
94 | | - joblib.delayed(_scan_chunk)( |
95 | | - found_packages, include_external_packages, exclude_type_checking_imports, chunk |
96 | | - ) |
97 | | - for chunk in chunks |
98 | | - ) |
99 | | - |
100 | | - imports_by_module_file = {} |
101 | | - for chunk_imports_by_module_file in import_scanning_jobs: |
102 | | - imports_by_module_file.update(chunk_imports_by_module_file) |
103 | | - return imports_by_module_file |
| 28 | + return {module_file: imports_by_module[module_file.module] for module_file in module_files} |
0 commit comments