diff --git a/build/baseline-7.4.neon b/build/baseline-7.4.neon index b28b9f9f5de..7dee4eae344 100644 --- a/build/baseline-7.4.neon +++ b/build/baseline-7.4.neon @@ -11,9 +11,9 @@ parameters: path: ../src/Parallel/ParallelAnalyser.php - - message: "#^Class PHPStan\\\\Parallel\\\\Process has an uninitialized property \\$process\\. Give it default value or assign it in the constructor\\.$#" + message: "#^Class PHPStan\\\\Parallel\\\\SpawnedProcess has an uninitialized property \\$process\\. Give it default value or assign it in the constructor\\.$#" count: 1 - path: ../src/Parallel/Process.php + path: ../src/Parallel/SpawnedProcess.php - message: "#^Class PHPStan\\\\PhpDoc\\\\ResolvedPhpDocBlock has an uninitialized property \\$phpDocNodes\\. Give it default value or assign it in the constructor\\.$#" count: 1 diff --git a/composer.json b/composer.json index af412db81f8..ce9335dc5aa 100644 --- a/composer.json +++ b/composer.json @@ -64,6 +64,10 @@ "phpstan/phpstan": "2.1.x", "symfony/polyfill-php73": "*" }, + "suggest": { + "ext-pcntl": "Enables forking parallel analysis workers from the already-booted process (experimental, skips the per-worker re-boot)", + "ext-posix": "Used together with ext-pcntl for forked parallel analysis workers" + }, "require-dev": { "cweagans/composer-patches": "^1.7.3", "php-parallel-lint/php-parallel-lint": "^1.2.0", diff --git a/src/Command/AnalyseCommand.php b/src/Command/AnalyseCommand.php index f7865ad7508..d2e39d33003 100644 --- a/src/Command/AnalyseCommand.php +++ b/src/Command/AnalyseCommand.php @@ -840,7 +840,7 @@ private function runFixer(InceptionResult $inceptionResult, Container $container $fixerApplication = $container->getByType(FixerApplication::class); return $fixerApplication->run( - $inceptionResult->getProjectConfigFile(), + $inceptionResult, $input, $output, count($files), diff --git a/src/Command/AnalyserRunner.php b/src/Command/AnalyserRunner.php index e0642493890..51149328b96 100644 --- a/src/Command/AnalyserRunner.php +++ b/src/Command/AnalyserRunner.php @@ -83,7 +83,7 @@ public function runAnalyser( if ($mainScript !== null && $schedule->getNumberOfProcesses() > 0) { $loop = new StreamSelectLoop(); $result = null; - $promise = $this->parallelAnalyser->analyse($loop, $schedule, $mainScript, $postFileCallback, $projectConfigFile, $tmpFile, $insteadOfFile, $input, null); + $promise = $this->parallelAnalyser->analyse($loop, $schedule, $allAnalysedFiles, $mainScript, $postFileCallback, $projectConfigFile, $tmpFile, $insteadOfFile, $input, null); $promise->then(static function (AnalyserResult $tmp) use (&$result): void { $result = $tmp; }); diff --git a/src/Command/FixerApplication.php b/src/Command/FixerApplication.php index 61f89f155be..d0deb1607ac 100644 --- a/src/Command/FixerApplication.php +++ b/src/Command/FixerApplication.php @@ -19,15 +19,19 @@ use PHPStan\File\FileMonitorResult; use PHPStan\File\FileReader; use PHPStan\File\FileWriter; +use PHPStan\File\PathNotFoundException; use PHPStan\Internal\ComposerHelper; use PHPStan\Internal\DirectoryCreator; use PHPStan\Internal\DirectoryCreatorException; use PHPStan\Internal\HttpClientFactory; +use PHPStan\Parallel\ForkParallelChecker; use PHPStan\PhpDoc\StubFilesProvider; +use PHPStan\Process\ForkedProcessPromise; use PHPStan\Process\ProcessCanceledException; use PHPStan\Process\ProcessCrashedException; use PHPStan\Process\ProcessHelper; use PHPStan\Process\ProcessPromise; +use PHPStan\Process\SpawnedProcessPromise; use PHPStan\ShouldNotHappenException; use React\ChildProcess\Process; use React\EventLoop\LoopInterface; @@ -94,18 +98,22 @@ public function __construct( #[AutowiredParameter] private string $usedLevel, private HttpClientFactory $httpClientFactory, + private ForkParallelChecker $forkParallelChecker, + private FixerWorkerRunner $fixerWorkerRunner, ) { } public function run( - ?string $projectConfigFile, + InceptionResult $inceptionResult, InputInterface $input, OutputInterface $output, int $filesCount, string $mainScript, ): int { + $projectConfigFile = $inceptionResult->getProjectConfigFile(); + $loop = new StreamSelectLoop(); $server = new TcpServer('127.0.0.1:0', $loop); /** @var string $serverAddress */ @@ -114,7 +122,7 @@ public function run( /** @var int<0, 65535> $serverPort */ $serverPort = parse_url($serverAddress, PHP_URL_PORT); - $server->on('connection', function (ConnectionInterface $connection) use ($loop, $projectConfigFile, $input, $output, $mainScript, $filesCount): void { + $server->on('connection', function (ConnectionInterface $connection) use ($loop, $inceptionResult, $projectConfigFile, $input, $output, $mainScript, $filesCount): void { // phpcs:disable SlevomatCodingStandard.Namespaces.ReferenceUsedNamesOnly $jsonInvalidUtf8Ignore = defined('JSON_INVALID_UTF8_IGNORE') ? JSON_INVALID_UTF8_IGNORE : 0; // phpcs:enable @@ -157,6 +165,7 @@ public function run( $this->analyse( $loop, + $inceptionResult, $mainScript, $projectConfigFile, $input, @@ -164,7 +173,7 @@ public function run( $encoder, ); - $this->monitorFileChanges($loop, function (FileMonitorResult $changes) use ($loop, $mainScript, $projectConfigFile, $input, $encoder, $output): void { + $this->monitorFileChanges($loop, function (FileMonitorResult $changes) use ($loop, $inceptionResult, $mainScript, $projectConfigFile, $input, $encoder, $output): void { if ($this->processInProgress !== null) { $this->processInProgress->cancel(); $this->processInProgress = null; @@ -178,6 +187,7 @@ public function run( $this->analyse( $loop, + $inceptionResult, $mainScript, $projectConfigFile, $input, @@ -417,6 +427,7 @@ private function monitorFileChanges(LoopInterface $loop, callable $hasChangesCal private function analyse( LoopInterface $loop, + InceptionResult $inceptionResult, string $mainScript, ?string $projectConfigFile, InputInterface $input, @@ -446,16 +457,16 @@ private function analyse( }); }); - $process = new ProcessPromise($loop, ProcessHelper::getWorkerCommand( + $process = $this->createProcessPromise( + $this->forkParallelChecker->isSupported(), + $loop, + $server, $mainScript, - 'fixer:worker', $projectConfigFile, - [ - '--server-port', - (string) $serverPort, - ], $input, - )); + $serverPort, + $inceptionResult, + ); $this->processInProgress = $process->run(); $this->processInProgress->then(function () use ($server): void { @@ -566,4 +577,51 @@ private function getStubFiles(): array return $stubFiles; } + /** + * @param int<0, 65535> $serverPort + */ + private function createProcessPromise( + bool $useFork, + LoopInterface $loop, + TcpServer $server, + string $mainScript, + ?string $projectConfigFile, + InputInterface $input, + int $serverPort, + InceptionResult $inceptionResult, + ): ProcessPromise + { + if ($useFork) { + try { + [$inceptionFiles, $isOnlyFiles] = $inceptionResult->getFiles(); + } catch (PathNotFoundException | InceptionNotSuccessfulException) { + throw new ShouldNotHappenException(); + } + + return new ForkedProcessPromise( + $loop, + $this->fixerWorkerRunner, + $server, + $inceptionResult->getErrorOutput(), + $inceptionFiles, + $isOnlyFiles, + $inceptionResult->getProjectConfigArray(), + $projectConfigFile, + $serverPort, + $input, + ); + } + + return new SpawnedProcessPromise($loop, ProcessHelper::getWorkerCommand( + $mainScript, + 'fixer:worker', + $projectConfigFile, + [ + '--server-port', + (string) $serverPort, + ], + $input, + )); + } + } diff --git a/src/Command/FixerWorkerCommand.php b/src/Command/FixerWorkerCommand.php index cfe2af34c9e..54c0528f0be 100644 --- a/src/Command/FixerWorkerCommand.php +++ b/src/Command/FixerWorkerCommand.php @@ -2,46 +2,17 @@ namespace PHPStan\Command; -use Clue\React\NDJson\Encoder; use Override; -use PHPStan\Analyser\AnalyserResult; -use PHPStan\Analyser\AnalyserResultFinalizer; -use PHPStan\Analyser\Error; -use PHPStan\Analyser\Ignore\IgnoredErrorHelper; -use PHPStan\Analyser\Ignore\IgnoredErrorHelperResult; -use PHPStan\Analyser\InternalError; -use PHPStan\Analyser\ResultCache\ResultCacheManager; -use PHPStan\Analyser\ResultCache\ResultCacheManagerFactory; -use PHPStan\DependencyInjection\Container; use PHPStan\File\PathNotFoundException; -use PHPStan\Parallel\ParallelAnalyser; -use PHPStan\Parallel\Scheduler; -use PHPStan\Process\CpuCoreCounter; use PHPStan\ShouldNotHappenException; -use React\EventLoop\LoopInterface; -use React\EventLoop\StreamSelectLoop; -use React\Promise\PromiseInterface; -use React\Socket\ConnectionInterface; -use React\Socket\TcpConnector; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -use function array_diff; -use function array_key_exists; -use function count; -use function filemtime; -use function in_array; use function is_array; use function is_bool; -use function is_file; use function is_string; -use function memory_get_peak_usage; -use function React\Promise\resolve; -use function sprintf; -use function usort; -use const JSON_INVALID_UTF8_IGNORE; final class FixerWorkerCommand extends Command { @@ -121,315 +92,25 @@ protected function execute(InputInterface $input, OutputInterface $output): int $container = $inceptionResult->getContainer(); - /** @var IgnoredErrorHelper $ignoredErrorHelper */ - $ignoredErrorHelper = $container->getByType(IgnoredErrorHelper::class); - $ignoredErrorHelperResult = $ignoredErrorHelper->initialize(); - if (count($ignoredErrorHelperResult->getErrors()) > 0) { + try { + [$inceptionFiles, $isOnlyFiles] = $inceptionResult->getFiles(); + } catch (PathNotFoundException | InceptionNotSuccessfulException) { throw new ShouldNotHappenException(); } - $loop = new StreamSelectLoop(); - $tcpConnector = new TcpConnector($loop); - $tcpConnector->connect(sprintf('127.0.0.1:%d', (int) $serverPort))->then(function (ConnectionInterface $connection) use ($container, $inceptionResult, $configuration, $input, $ignoredErrorHelperResult, $loop): void { - // phpcs:disable SlevomatCodingStandard.Namespaces.ReferenceUsedNamesOnly - $jsonInvalidUtf8Ignore = defined('JSON_INVALID_UTF8_IGNORE') ? JSON_INVALID_UTF8_IGNORE : 0; - // phpcs:enable - $out = new Encoder($connection, $jsonInvalidUtf8Ignore); - //$in = new Decoder($connection, true, 512, $jsonInvalidUtf8Ignore, 128 * 1024 * 1024); - - /** @var ResultCacheManager $resultCacheManager */ - $resultCacheManager = $container->getByType(ResultCacheManagerFactory::class)->create([]); - $projectConfigArray = $inceptionResult->getProjectConfigArray(); - - /** @var AnalyserResultFinalizer $analyserResultFinalizer */ - $analyserResultFinalizer = $container->getByType(AnalyserResultFinalizer::class); - - try { - [$inceptionFiles, $isOnlyFiles] = $inceptionResult->getFiles(); - } catch (PathNotFoundException | InceptionNotSuccessfulException) { - throw new ShouldNotHappenException(); - } - - $out->write([ - 'action' => 'analysisStart', - 'result' => [ - 'analysedFiles' => $inceptionFiles, - ], - ]); - - $resultCache = $resultCacheManager->restore($inceptionFiles, false, false, $projectConfigArray, $inceptionResult->getErrorOutput()); - - $errorsFromResultCacheTmp = $resultCache->getErrors(); - $locallyIgnoredErrorsFromResultCacheTmp = $resultCache->getLocallyIgnoredErrors(); - foreach ($resultCache->getFilesToAnalyse() as $fileToAnalyse) { - unset($errorsFromResultCacheTmp[$fileToAnalyse]); - unset($locallyIgnoredErrorsFromResultCacheTmp[$fileToAnalyse]); - } - - $errorsFromResultCache = []; - foreach ($errorsFromResultCacheTmp as $errorsByFile) { - foreach ($errorsByFile as $error) { - $errorsFromResultCache[] = $error; - } - } - - [$errorsFromResultCache, $ignoredErrorsFromResultCache] = $this->filterErrors($errorsFromResultCache, $ignoredErrorHelperResult, $isOnlyFiles, $inceptionFiles, false); - - foreach ($locallyIgnoredErrorsFromResultCacheTmp as $locallyIgnoredErrors) { - foreach ($locallyIgnoredErrors as $locallyIgnoredError) { - $ignoredErrorsFromResultCache[] = [$locallyIgnoredError, null]; - } - } - - $out->write([ - 'action' => 'analysisStream', - 'result' => [ - 'errors' => $errorsFromResultCache, - 'ignoredErrors' => $ignoredErrorsFromResultCache, - 'analysedFiles' => array_diff($inceptionFiles, $resultCache->getFilesToAnalyse()), - ], - ]); - - $filesToAnalyse = $resultCache->getFilesToAnalyse(); - usort($filesToAnalyse, static function (string $a, string $b): int { - $aTime = @filemtime($a); - if ($aTime === false) { - return 1; - } - - $bTime = @filemtime($b); - if ($bTime === false) { - return -1; - } - - // files are sorted from the oldest - // because ParallelAnalyser reverses the scheduler jobs to do the smallest - // jobs first - return $aTime <=> $bTime; - }); - - $this->runAnalyser( - $loop, - $container, - $filesToAnalyse, - $configuration, - $input, - function (array $errors, array $locallyIgnoredErrors, array $analysedFiles) use ($out, $ignoredErrorHelperResult, $isOnlyFiles, $inceptionFiles): void { - $internalErrors = []; - foreach ($errors as $fileSpecificError) { - if (!$fileSpecificError->hasNonIgnorableException()) { - continue; - } - - $internalErrors[] = $this->transformErrorIntoInternalError($fileSpecificError); - } - - if (count($internalErrors) > 0) { - $out->write(['action' => 'analysisCrash', 'data' => [ - 'internalErrors' => $internalErrors, - ]]); - return; - } - - [$errors, $ignoredErrors] = $this->filterErrors($errors, $ignoredErrorHelperResult, $isOnlyFiles, $inceptionFiles, false); - foreach ($locallyIgnoredErrors as $locallyIgnoredError) { - $ignoredErrors[] = [$locallyIgnoredError, null]; - } - $out->write([ - 'action' => 'analysisStream', - 'result' => [ - 'errors' => $errors, - 'ignoredErrors' => $ignoredErrors, - 'analysedFiles' => $analysedFiles, - ], - ]); - }, - )->then(function (AnalyserResult $intermediateAnalyserResult) use ($analyserResultFinalizer, $resultCacheManager, $resultCache, $inceptionResult, $isOnlyFiles, $ignoredErrorHelperResult, $inceptionFiles, $out): void { - $analyserResult = $resultCacheManager->process( - $intermediateAnalyserResult, - $resultCache, - $inceptionResult->getErrorOutput(), - false, - true, - )->getAnalyserResult(); - $finalizerResult = $analyserResultFinalizer->finalize($analyserResult, $isOnlyFiles, false); - - $internalErrors = []; - foreach ($finalizerResult->getAnalyserResult()->getInternalErrors() as $internalError) { - $internalErrors[] = new InternalError( - $internalError->getTraceAsString() !== null ? sprintf('Internal error: %s', $internalError->getMessage()) : $internalError->getMessage(), - $internalError->getContextDescription(), - $internalError->getTrace(), - $internalError->getTraceAsString(), - $internalError->shouldReportBug(), - ); - } - - foreach ($finalizerResult->getAnalyserResult()->getUnorderedErrors() as $fileSpecificError) { - if (!$fileSpecificError->hasNonIgnorableException()) { - continue; - } - - $internalErrors[] = $this->transformErrorIntoInternalError($fileSpecificError); - } - - $hasInternalErrors = count($internalErrors) > 0 || $finalizerResult->getAnalyserResult()->hasReachedInternalErrorsCountLimit(); - - if ($hasInternalErrors) { - $out->write(['action' => 'analysisCrash', 'data' => [ - 'internalErrors' => count($internalErrors) > 0 ? $internalErrors : [ - new InternalError( - 'Internal error occurred', - 'running analyser in PHPStan Pro worker', - trace: [], - traceAsString: null, - shouldReportBug: false, - ), - ], - ]]); - } - - [$collectorErrors, $ignoredCollectorErrors] = $this->filterErrors($finalizerResult->getCollectorErrors(), $ignoredErrorHelperResult, $isOnlyFiles, $inceptionFiles, $hasInternalErrors); - foreach ($finalizerResult->getLocallyIgnoredCollectorErrors() as $locallyIgnoredCollectorError) { - $ignoredCollectorErrors[] = [$locallyIgnoredCollectorError, null]; - } - $out->write([ - 'action' => 'analysisStream', - 'result' => [ - 'errors' => $collectorErrors, - 'ignoredErrors' => $ignoredCollectorErrors, - 'analysedFiles' => [], - ], - ]); - - $ignoredErrorHelperProcessedResult = $ignoredErrorHelperResult->process( - $finalizerResult->getErrors(), - $isOnlyFiles, - $inceptionFiles, - $hasInternalErrors, - ); - $ignoreFileErrors = []; - foreach ($ignoredErrorHelperProcessedResult->getNotIgnoredErrors() as $error) { - if ($error->getIdentifier() === null) { - continue; - } - if (!in_array($error->getIdentifier(), ['ignore.count', 'ignore.unmatched', 'ignore.unmatchedLine', 'ignore.unmatchedIdentifier', 'ignore.noComment'], true)) { - continue; - } - $ignoreFileErrors[] = $error; - } - - $out->end([ - 'action' => 'analysisEnd', - 'result' => [ - 'ignoreFileErrors' => $ignoreFileErrors, - 'ignoreNotFileErrors' => $ignoredErrorHelperProcessedResult->getOtherIgnoreMessages(), - ], - ]); - }); - }); - $loop->run(); - - return 0; - } - - private function transformErrorIntoInternalError(Error $error): InternalError - { - $message = $error->getMessage(); - $metadata = $error->getMetadata(); - if ( - $error->getIdentifier() === 'phpstan.internal' - && array_key_exists(InternalError::STACK_TRACE_AS_STRING_METADATA_KEY, $metadata) - ) { - $message = sprintf('Internal error: %s', $message); - } - - return new InternalError( - $message, - sprintf('analysing file %s', $error->getTraitFilePath() ?? $error->getFilePath()), - $metadata[InternalError::STACK_TRACE_METADATA_KEY] ?? [], - $metadata[InternalError::STACK_TRACE_AS_STRING_METADATA_KEY] ?? null, - shouldReportBug: true, - ); - } - - /** - * @param string[] $inceptionFiles - * @param list $errors - * @return array{list, list} - */ - private function filterErrors(array $errors, IgnoredErrorHelperResult $ignoredErrorHelperResult, bool $onlyFiles, array $inceptionFiles, bool $hasInternalErrors): array - { - $ignoredErrorHelperProcessedResult = $ignoredErrorHelperResult->process($errors, $onlyFiles, $inceptionFiles, $hasInternalErrors); - $finalErrors = []; - foreach ($ignoredErrorHelperProcessedResult->getNotIgnoredErrors() as $error) { - if ($error->getIdentifier() === null) { - $finalErrors[] = $error; - continue; - } - if (in_array($error->getIdentifier(), ['ignore.count', 'ignore.unmatched'], true)) { - continue; - } - $finalErrors[] = $error; - } - - return [ - $finalErrors, - $ignoredErrorHelperProcessedResult->getIgnoredErrors(), - ]; - } - - /** - * @param string[] $files - * @param callable(list, list, string[]): void $onFileAnalysisHandler - * @return PromiseInterface - */ - private function runAnalyser(LoopInterface $loop, Container $container, array $files, ?string $configuration, InputInterface $input, callable $onFileAnalysisHandler): PromiseInterface - { - /** @var ParallelAnalyser $parallelAnalyser */ - $parallelAnalyser = $container->getByType(ParallelAnalyser::class); - $filesCount = count($files); - if ($filesCount === 0) { - return resolve(new AnalyserResult( - unorderedErrors: [], - filteredPhpErrors: [], - allPhpErrors: [], - locallyIgnoredErrors: [], - linesToIgnore: [], - unmatchedLineIgnores: [], - internalErrors: [], - collectedData: [], - dependencies: [], - usedTraitDependencies: [], - exportedNodes: [], - reachedInternalErrorsCountLimit: false, - peakMemoryUsageBytes: memory_get_peak_usage(true), - processedFiles: [], - )); - } - - /** @var Scheduler $scheduler */ - $scheduler = $container->getByType(Scheduler::class); - - /** @var CpuCoreCounter $cpuCoreCounter */ - $cpuCoreCounter = $container->getByType(CpuCoreCounter::class); - - $schedule = $scheduler->scheduleWork($cpuCoreCounter->getNumberOfCpuCores(), $files); - $mainScript = null; - if (isset($_SERVER['argv'][0]) && is_file($_SERVER['argv'][0])) { - $mainScript = $_SERVER['argv'][0]; - } + // Everything after the boot lives in FixerWorkerRunner so a + // pcntl_fork()-ed child can reuse it without re-booting (see + // FixerApplication). + $fixerWorkerRunner = $container->getByType(FixerWorkerRunner::class); - return $parallelAnalyser->analyse( - $loop, - $schedule, - $mainScript, - null, + return $fixerWorkerRunner->run( + $inceptionResult->getErrorOutput(), + $inceptionFiles, + $isOnlyFiles, + $inceptionResult->getProjectConfigArray(), $configuration, - null, - null, + (int) $serverPort, $input, - $onFileAnalysisHandler, ); } diff --git a/src/Command/FixerWorkerRunner.php b/src/Command/FixerWorkerRunner.php new file mode 100644 index 00000000000..9f70596ae82 --- /dev/null +++ b/src/Command/FixerWorkerRunner.php @@ -0,0 +1,374 @@ +ignoredErrorHelper->initialize(); + if (count($ignoredErrorHelperResult->getErrors()) > 0) { + throw new ShouldNotHappenException(); + } + + // Always a fresh event loop: in a forked child the parent's inherited + // loop must never be touched. + $loop = new StreamSelectLoop(); + $tcpConnector = new TcpConnector($loop); + $tcpConnector->connect(sprintf('127.0.0.1:%d', $serverPort))->then(function (ConnectionInterface $connection) use ($errorOutput, $inceptionFiles, $isOnlyFiles, $projectConfigArray, $configuration, $input, $ignoredErrorHelperResult, $loop): void { + // phpcs:disable SlevomatCodingStandard.Namespaces.ReferenceUsedNamesOnly + $jsonInvalidUtf8Ignore = defined('JSON_INVALID_UTF8_IGNORE') ? JSON_INVALID_UTF8_IGNORE : 0; + // phpcs:enable + $out = new Encoder($connection, $jsonInvalidUtf8Ignore); + //$in = new Decoder($connection, true, 512, $jsonInvalidUtf8Ignore, 128 * 1024 * 1024); + + /** @var ResultCacheManager $resultCacheManager */ + $resultCacheManager = $this->resultCacheManagerFactory->create([]); + + $out->write([ + 'action' => 'analysisStart', + 'result' => [ + 'analysedFiles' => $inceptionFiles, + ], + ]); + + $resultCache = $resultCacheManager->restore($inceptionFiles, false, false, $projectConfigArray, $errorOutput); + + $errorsFromResultCacheTmp = $resultCache->getErrors(); + $locallyIgnoredErrorsFromResultCacheTmp = $resultCache->getLocallyIgnoredErrors(); + foreach ($resultCache->getFilesToAnalyse() as $fileToAnalyse) { + unset($errorsFromResultCacheTmp[$fileToAnalyse]); + unset($locallyIgnoredErrorsFromResultCacheTmp[$fileToAnalyse]); + } + + $errorsFromResultCache = []; + foreach ($errorsFromResultCacheTmp as $errorsByFile) { + foreach ($errorsByFile as $error) { + $errorsFromResultCache[] = $error; + } + } + + [$errorsFromResultCache, $ignoredErrorsFromResultCache] = $this->filterErrors($errorsFromResultCache, $ignoredErrorHelperResult, $isOnlyFiles, $inceptionFiles, false); + + foreach ($locallyIgnoredErrorsFromResultCacheTmp as $locallyIgnoredErrors) { + foreach ($locallyIgnoredErrors as $locallyIgnoredError) { + $ignoredErrorsFromResultCache[] = [$locallyIgnoredError, null]; + } + } + + $out->write([ + 'action' => 'analysisStream', + 'result' => [ + 'errors' => $errorsFromResultCache, + 'ignoredErrors' => $ignoredErrorsFromResultCache, + 'analysedFiles' => array_diff($inceptionFiles, $resultCache->getFilesToAnalyse()), + ], + ]); + + $filesToAnalyse = $resultCache->getFilesToAnalyse(); + usort($filesToAnalyse, static function (string $a, string $b): int { + $aTime = @filemtime($a); + if ($aTime === false) { + return 1; + } + + $bTime = @filemtime($b); + if ($bTime === false) { + return -1; + } + + // files are sorted from the oldest + // because ParallelAnalyser reverses the scheduler jobs to do the smallest + // jobs first + return $aTime <=> $bTime; + }); + + $this->runAnalyser( + $loop, + $filesToAnalyse, + $inceptionFiles, + $configuration, + $input, + function (array $errors, array $locallyIgnoredErrors, array $analysedFiles) use ($out, $ignoredErrorHelperResult, $isOnlyFiles, $inceptionFiles): void { + $internalErrors = []; + foreach ($errors as $fileSpecificError) { + if (!$fileSpecificError->hasNonIgnorableException()) { + continue; + } + + $internalErrors[] = $this->transformErrorIntoInternalError($fileSpecificError); + } + + if (count($internalErrors) > 0) { + $out->write(['action' => 'analysisCrash', 'data' => [ + 'internalErrors' => $internalErrors, + ]]); + return; + } + + [$errors, $ignoredErrors] = $this->filterErrors($errors, $ignoredErrorHelperResult, $isOnlyFiles, $inceptionFiles, false); + foreach ($locallyIgnoredErrors as $locallyIgnoredError) { + $ignoredErrors[] = [$locallyIgnoredError, null]; + } + $out->write([ + 'action' => 'analysisStream', + 'result' => [ + 'errors' => $errors, + 'ignoredErrors' => $ignoredErrors, + 'analysedFiles' => $analysedFiles, + ], + ]); + }, + )->then(function (AnalyserResult $intermediateAnalyserResult) use ($resultCacheManager, $resultCache, $errorOutput, $isOnlyFiles, $ignoredErrorHelperResult, $inceptionFiles, $out): void { + $analyserResult = $resultCacheManager->process( + $intermediateAnalyserResult, + $resultCache, + $errorOutput, + false, + true, + )->getAnalyserResult(); + $finalizerResult = $this->analyserResultFinalizer->finalize($analyserResult, $isOnlyFiles, false); + + $internalErrors = []; + foreach ($finalizerResult->getAnalyserResult()->getInternalErrors() as $internalError) { + $internalErrors[] = new InternalError( + $internalError->getTraceAsString() !== null ? sprintf('Internal error: %s', $internalError->getMessage()) : $internalError->getMessage(), + $internalError->getContextDescription(), + $internalError->getTrace(), + $internalError->getTraceAsString(), + $internalError->shouldReportBug(), + ); + } + + foreach ($finalizerResult->getAnalyserResult()->getUnorderedErrors() as $fileSpecificError) { + if (!$fileSpecificError->hasNonIgnorableException()) { + continue; + } + + $internalErrors[] = $this->transformErrorIntoInternalError($fileSpecificError); + } + + $hasInternalErrors = count($internalErrors) > 0 || $finalizerResult->getAnalyserResult()->hasReachedInternalErrorsCountLimit(); + + if ($hasInternalErrors) { + $out->write(['action' => 'analysisCrash', 'data' => [ + 'internalErrors' => count($internalErrors) > 0 ? $internalErrors : [ + new InternalError( + 'Internal error occurred', + 'running analyser in PHPStan Pro worker', + trace: [], + traceAsString: null, + shouldReportBug: false, + ), + ], + ]]); + } + + [$collectorErrors, $ignoredCollectorErrors] = $this->filterErrors($finalizerResult->getCollectorErrors(), $ignoredErrorHelperResult, $isOnlyFiles, $inceptionFiles, $hasInternalErrors); + foreach ($finalizerResult->getLocallyIgnoredCollectorErrors() as $locallyIgnoredCollectorError) { + $ignoredCollectorErrors[] = [$locallyIgnoredCollectorError, null]; + } + $out->write([ + 'action' => 'analysisStream', + 'result' => [ + 'errors' => $collectorErrors, + 'ignoredErrors' => $ignoredCollectorErrors, + 'analysedFiles' => [], + ], + ]); + + $ignoredErrorHelperProcessedResult = $ignoredErrorHelperResult->process( + $finalizerResult->getErrors(), + $isOnlyFiles, + $inceptionFiles, + $hasInternalErrors, + ); + $ignoreFileErrors = []; + foreach ($ignoredErrorHelperProcessedResult->getNotIgnoredErrors() as $error) { + if ($error->getIdentifier() === null) { + continue; + } + if (!in_array($error->getIdentifier(), ['ignore.count', 'ignore.unmatched', 'ignore.unmatchedLine', 'ignore.unmatchedIdentifier', 'ignore.noComment'], true)) { + continue; + } + $ignoreFileErrors[] = $error; + } + + $out->end([ + 'action' => 'analysisEnd', + 'result' => [ + 'ignoreFileErrors' => $ignoreFileErrors, + 'ignoreNotFileErrors' => $ignoredErrorHelperProcessedResult->getOtherIgnoreMessages(), + ], + ]); + }); + }); + $loop->run(); + + return 0; + } + + private function transformErrorIntoInternalError(Error $error): InternalError + { + $message = $error->getMessage(); + $metadata = $error->getMetadata(); + if ( + $error->getIdentifier() === 'phpstan.internal' + && array_key_exists(InternalError::STACK_TRACE_AS_STRING_METADATA_KEY, $metadata) + ) { + $message = sprintf('Internal error: %s', $message); + } + + return new InternalError( + $message, + sprintf('analysing file %s', $error->getTraitFilePath() ?? $error->getFilePath()), + $metadata[InternalError::STACK_TRACE_METADATA_KEY] ?? [], + $metadata[InternalError::STACK_TRACE_AS_STRING_METADATA_KEY] ?? null, + shouldReportBug: true, + ); + } + + /** + * @param string[] $inceptionFiles + * @param list $errors + * @return array{list, list} + */ + private function filterErrors(array $errors, IgnoredErrorHelperResult $ignoredErrorHelperResult, bool $onlyFiles, array $inceptionFiles, bool $hasInternalErrors): array + { + $ignoredErrorHelperProcessedResult = $ignoredErrorHelperResult->process($errors, $onlyFiles, $inceptionFiles, $hasInternalErrors); + $finalErrors = []; + foreach ($ignoredErrorHelperProcessedResult->getNotIgnoredErrors() as $error) { + if ($error->getIdentifier() === null) { + $finalErrors[] = $error; + continue; + } + if (in_array($error->getIdentifier(), ['ignore.count', 'ignore.unmatched'], true)) { + continue; + } + $finalErrors[] = $error; + } + + return [ + $finalErrors, + $ignoredErrorHelperProcessedResult->getIgnoredErrors(), + ]; + } + + /** + * @param string[] $files + * @param string[] $allAnalysedFiles + * @param callable(list, list, string[]): void $onFileAnalysisHandler + * @return PromiseInterface + */ + private function runAnalyser(LoopInterface $loop, array $files, array $allAnalysedFiles, ?string $configuration, InputInterface $input, callable $onFileAnalysisHandler): PromiseInterface + { + $filesCount = count($files); + if ($filesCount === 0) { + return resolve(new AnalyserResult( + unorderedErrors: [], + filteredPhpErrors: [], + allPhpErrors: [], + locallyIgnoredErrors: [], + linesToIgnore: [], + unmatchedLineIgnores: [], + internalErrors: [], + collectedData: [], + dependencies: [], + usedTraitDependencies: [], + exportedNodes: [], + reachedInternalErrorsCountLimit: false, + peakMemoryUsageBytes: memory_get_peak_usage(true), + processedFiles: [], + )); + } + + $schedule = $this->scheduler->scheduleWork($this->cpuCoreCounter->getNumberOfCpuCores(), $files); + $mainScript = null; + if (isset($_SERVER['argv'][0]) && is_file($_SERVER['argv'][0])) { + $mainScript = $_SERVER['argv'][0]; + } + + return $this->parallelAnalyser->analyse( + $loop, + $schedule, + $allAnalysedFiles, + $mainScript, + null, + $configuration, + null, + null, + $input, + $onFileAnalysisHandler, + ); + } + +} diff --git a/src/Command/WorkerCommand.php b/src/Command/WorkerCommand.php index d5f42b87956..a1dbab37f50 100644 --- a/src/Command/WorkerCommand.php +++ b/src/Command/WorkerCommand.php @@ -2,38 +2,18 @@ namespace PHPStan\Command; -use Clue\React\NDJson\Decoder; -use Clue\React\NDJson\Encoder; use Override; -use PHPStan\Analyser\FileAnalyser; -use PHPStan\Analyser\InternalError; -use PHPStan\Analyser\NodeScopeResolver; -use PHPStan\Collectors\Registry as CollectorRegistry; -use PHPStan\DependencyInjection\Container; use PHPStan\File\PathNotFoundException; -use PHPStan\Rules\Registry as RuleRegistry; +use PHPStan\Parallel\WorkerRunner; use PHPStan\ShouldNotHappenException; -use React\EventLoop\StreamSelectLoop; -use React\Socket\ConnectionInterface; -use React\Socket\TcpConnector; -use React\Stream\ReadableStreamInterface; -use React\Stream\WritableStreamInterface; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -use Throwable; -use function array_fill_keys; -use function array_filter; -use function array_merge; -use function array_unshift; -use function array_values; -use function defined; use function is_array; use function is_bool; use function is_string; -use function memory_get_peak_usage; use function sprintf; final class WorkerCommand extends Command @@ -41,8 +21,6 @@ final class WorkerCommand extends Command private const NAME = 'worker'; - private int $errorCount = 0; - /** * @param string[] $composerAutoloaderProjectPaths */ @@ -122,13 +100,11 @@ protected function execute(InputInterface $input, OutputInterface $output): int } catch (InceptionNotSuccessfulException $e) { return 1; } - $loop = new StreamSelectLoop(); $container = $inceptionResult->getContainer(); try { [$analysedFiles] = $inceptionResult->getFiles(); - $analysedFiles = $this->switchTmpFile($analysedFiles, $insteadOfFile, $tmpFile); } catch (PathNotFoundException $e) { $inceptionResult->getErrorOutput()->writeLineFormatted(sprintf('%s', $e->getMessage())); return 1; @@ -136,182 +112,18 @@ protected function execute(InputInterface $input, OutputInterface $output): int return 1; } - $nodeScopeResolver = $container->getByType(NodeScopeResolver::class); - $nodeScopeResolver->setAnalysedFiles($analysedFiles); - - $analysedFiles = array_fill_keys($analysedFiles, true); - - $tcpConnector = new TcpConnector($loop); - $tcpConnector->connect(sprintf('127.0.0.1:%d', (int) $port))->then(function (ConnectionInterface $connection) use ($container, $identifier, $output, $analysedFiles, $tmpFile, $insteadOfFile): void { - // phpcs:disable SlevomatCodingStandard.Namespaces.ReferenceUsedNamesOnly - $jsonInvalidUtf8Ignore = defined('JSON_INVALID_UTF8_IGNORE') ? JSON_INVALID_UTF8_IGNORE : 0; - // phpcs:enable - $out = new Encoder($connection, $jsonInvalidUtf8Ignore); - $in = new Decoder($connection, true, options: $jsonInvalidUtf8Ignore, maxlength: $container->getParameter('parallel')['buffer']); - $out->write(['action' => 'hello', 'identifier' => $identifier]); - $this->runWorker($container, $out, $in, $output, $analysedFiles, $tmpFile, $insteadOfFile); - }); - - $loop->run(); - - if ($this->errorCount > 0) { - return 1; - } - - return 0; - } - - /** - * @param array $analysedFiles - */ - private function runWorker( - Container $container, - WritableStreamInterface $out, - ReadableStreamInterface $in, - OutputInterface $output, - array $analysedFiles, - ?string $tmpFile, - ?string $insteadOfFile, - ): void - { - $handleError = function (Throwable $error) use ($out, $output): void { - $this->errorCount++; - $output->writeln(sprintf('Error: %s', $error->getMessage())); - $out->write([ - 'action' => 'result', - 'result' => [ - 'errors' => [], - 'internalErrors' => [ - new InternalError( - $error->getMessage(), - 'communicating with main process in parallel worker', - InternalError::prepareTrace($error), - $error->getTraceAsString(), - shouldReportBug: true, - ), - ], - 'filteredPhpErrors' => [], - 'allPhpErrors' => [], - 'locallyIgnoredErrors' => [], - 'linesToIgnore' => [], - 'unmatchedLineIgnores' => [], - 'collectedData' => [], - 'memoryUsage' => memory_get_peak_usage(true), - 'dependencies' => [], - 'exportedNodes' => [], - 'files' => [], - 'internalErrorsCount' => 1, - ], - ]); - $out->end(); - }; - $out->on('error', $handleError); - $fileAnalyser = $container->getByType(FileAnalyser::class); - $ruleRegistry = $container->getByType(RuleRegistry::class); - $collectorRegistry = $container->getByType(CollectorRegistry::class); - $in->on('data', static function (array $json) use ($fileAnalyser, $ruleRegistry, $collectorRegistry, $out, $analysedFiles, $tmpFile, $insteadOfFile): void { - $action = $json['action']; - if ($action !== 'analyse') { - return; - } - - $internalErrorsCount = 0; - $files = $json['files']; - $errors = []; - $internalErrors = []; - $filteredPhpErrors = []; - $allPhpErrors = []; - $locallyIgnoredErrors = []; - $linesToIgnore = []; - $unmatchedLineIgnores = []; - $collectedData = []; - $dependencies = []; - $usedTraitDependencies = []; - $exportedNodes = []; - $processedFiles = []; - foreach ($files as $file) { - try { - if ($file === $insteadOfFile) { - $file = $tmpFile; - } - $fileAnalyserResult = $fileAnalyser->analyseFile($file, $analysedFiles, $ruleRegistry, $collectorRegistry, null); - $fileErrors = $fileAnalyserResult->getErrors(); - $filteredPhpErrors = array_merge($filteredPhpErrors, $fileAnalyserResult->getFilteredPhpErrors()); - $allPhpErrors = array_merge($allPhpErrors, $fileAnalyserResult->getAllPhpErrors()); - $linesToIgnore[$file] = $fileAnalyserResult->getLinesToIgnore(); - $unmatchedLineIgnores[$file] = $fileAnalyserResult->getUnmatchedLineIgnores(); - $dependencies[$file] = $fileAnalyserResult->getDependencies(); - $usedTraitDependencies[$file] = $fileAnalyserResult->getUsedTraitDependencies(); - $exportedNodes[$file] = $fileAnalyserResult->getExportedNodes(); - $processedFiles = array_merge($processedFiles, $fileAnalyserResult->getProcessedFiles()); - foreach ($fileErrors as $fileError) { - $errors[] = $fileError; - } - foreach ($fileAnalyserResult->getLocallyIgnoredErrors() as $locallyIgnoredError) { - $locallyIgnoredErrors[] = $locallyIgnoredError; - } - foreach ($fileAnalyserResult->getCollectedData() as $collectedFile => $dataPerCollector) { - foreach ($dataPerCollector as $collectorType => $collectorData) { - foreach ($collectorData as $data) { - $collectedData[$collectedFile][$collectorType][] = $data; - } - } - } - } catch (Throwable $t) { - $internalErrorsCount++; - $internalErrors[] = new InternalError( - $t->getMessage(), - sprintf('analysing file %s', $file), - InternalError::prepareTrace($t), - $t->getTraceAsString(), - shouldReportBug: true, - ); - } - } - - $out->write([ - 'action' => 'result', - 'result' => [ - 'errors' => $errors, - 'internalErrors' => $internalErrors, - 'filteredPhpErrors' => $filteredPhpErrors, - 'allPhpErrors' => $allPhpErrors, - 'locallyIgnoredErrors' => $locallyIgnoredErrors, - 'linesToIgnore' => $linesToIgnore, - 'unmatchedLineIgnores' => $unmatchedLineIgnores, - 'collectedData' => $collectedData, - 'memoryUsage' => memory_get_peak_usage(true), - 'dependencies' => $dependencies, - 'usedTraitDependencies' => $usedTraitDependencies, - 'exportedNodes' => $exportedNodes, - 'files' => $files, - 'processedFiles' => $processedFiles, - 'internalErrorsCount' => $internalErrorsCount, - ]]); - }); - $in->on('error', $handleError); - } - - /** - * @param string[] $analysedFiles - * @return string[] - */ - private function switchTmpFile( - array $analysedFiles, - ?string $insteadOfFile, - ?string $tmpFile, - ): array - { - if ($insteadOfFile === null) { - return $analysedFiles; - } - $analysedFiles = array_values(array_filter($analysedFiles, static fn (string $file): bool => $file !== $insteadOfFile)); - - if ($tmpFile !== null) { - array_unshift($analysedFiles, $tmpFile); - } - - return $analysedFiles; + // Everything after the boot lives in WorkerRunner so a pcntl_fork()-ed + // child can reuse it without re-booting (see ParallelAnalyser). + $workerRunner = $container->getByType(WorkerRunner::class); + + return $workerRunner->run( + $output, + $analysedFiles, + (int) $port, + $identifier, + $tmpFile, + $insteadOfFile, + ); } } diff --git a/src/Parallel/ForkParallelChecker.php b/src/Parallel/ForkParallelChecker.php new file mode 100644 index 00000000000..e75a543c2f6 --- /dev/null +++ b/src/Parallel/ForkParallelChecker.php @@ -0,0 +1,90 @@ +getDisabledReason() === null; + } + + public function print(Output $output): void + { + $output->writeLineFormatted('Parallel worker creation:'); + + $reason = $this->getDisabledReason(); + if ($reason === null) { + $output->writeLineFormatted('Mechanism: fork (pcntl_fork — experimental)'); + $output->writeLineFormatted(''); + return; + } + + $output->writeLineFormatted('Mechanism: spawn (react/child-process)'); + if (getenv('PHPSTAN_PARALLEL_FORK') === '1') { + $output->writeLineFormatted(sprintf('Reason fork not used: %s', $reason)); + } + $output->writeLineFormatted(''); + } + + private function getDisabledReason(): ?string + { + if ( + !function_exists('pcntl_fork') + || !function_exists('pcntl_waitpid') + || !function_exists('pcntl_wifexited') + || !function_exists('pcntl_wexitstatus') + || !function_exists('posix_kill') + ) { + return 'pcntl/posix functions are not available'; + } + + if (getenv('PHPSTAN_PARALLEL_FORK') !== '1') { + return 'PHPSTAN_PARALLEL_FORK environment variable is not set to "1"'; + } + + if ($this->isOpcacheOrJitEnabled()) { + return 'OPcache or JIT is enabled (forked workers require both to be off — their shared memory corrupts under concurrent population)'; + } + + return null; + } + + private function isOpcacheOrJitEnabled(): bool + { + if (!function_exists('opcache_get_status')) { + return false; + } + + $status = opcache_get_status(false); + if ($status === false) { + return false; + } + + if (($status['opcache_enabled'] ?? false) === true) { + return true; + } + + return ($status['jit']['enabled'] ?? false) === true; + } + +} diff --git a/src/Parallel/ForkedProcess.php b/src/Parallel/ForkedProcess.php new file mode 100644 index 00000000000..0d26422d29f --- /dev/null +++ b/src/Parallel/ForkedProcess.php @@ -0,0 +1,151 @@ +setCallbacks($onData, $onError); + + // Created before the fork so the parent can read what the child wrote. + $tmpStdOut = tmpfile(); + if ($tmpStdOut === false) { + throw new ShouldNotHappenException('Failed creating temp file for stdout.'); + } + $this->stdOut = $tmpStdOut; + + $pid = pcntl_fork(); + + if ($pid === -1) { + fclose($this->stdOut); + $this->stdOut = null; + // Deferred so it runs after ParallelAnalyser has attached this + // process to the pool — otherwise tryQuitProcess() would no-op. + $this->loop->futureTick(static function () use ($onExit): void { + $onExit(null, 'pcntl_fork() failed.'); + }); + return; + } + + if ($pid === 0) { + // Child: drop the inherited listening socket immediately, then run + // the worker on its own fresh event loop and never return. + $this->server->close(); + $output = new StreamOutput($this->stdOut); + $exitCode = $this->workerRunner->run( + $output, + $this->analysedFiles, + $this->serverPort, + $this->identifier, + $this->tmpFile, + $this->insteadOfFile, + ); + exit($exitCode); + } + + // Parent: poll for the child to exit and report it through $onExit. + $this->waitTimer = $this->loop->addPeriodicTimer(self::WAITPID_POLL_INTERVAL, function () use ($pid, $onExit): void { + $status = 0; + $result = pcntl_waitpid($pid, $status, WNOHANG); + if ($result === 0) { + return; + } + + $this->cancelWaitTimer(); + $this->cancelTimer(); + + $exitCode = null; + if ($result > 0 && pcntl_wifexited($status)) { + $exitStatus = pcntl_wexitstatus($status); + if ($exitStatus !== false) { + $exitCode = $exitStatus; + } + } + + $output = ''; + if ($this->stdOut !== null) { + rewind($this->stdOut); + $output = (string) stream_get_contents($this->stdOut); + fclose($this->stdOut); + $this->stdOut = null; + } + + $onExit($exitCode, $output); + }); + } + + public function quit(): void + { + // Ending the connection makes the child's event loop drain and the + // child exit; the waitpid poll timer must keep running until then so + // the child is actually reaped (otherwise: zombie + hang). + $this->endConnection(); + } + + private function cancelWaitTimer(): void + { + if ($this->waitTimer === null) { + return; + } + + $this->loop->cancelTimer($this->waitTimer); + $this->waitTimer = null; + } + +} diff --git a/src/Parallel/ParallelAnalyser.php b/src/Parallel/ParallelAnalyser.php index 248b52a4a34..c5be7a4e546 100644 --- a/src/Parallel/ParallelAnalyser.php +++ b/src/Parallel/ParallelAnalyser.php @@ -52,12 +52,15 @@ public function __construct( float $processTimeout, #[AutowiredParameter(ref: '%parallel.buffer%')] private int $decoderBufferSize, + private ForkParallelChecker $forkParallelChecker, + private WorkerRunner $workerRunner, ) { $this->processTimeout = max($processTimeout, self::DEFAULT_TIMEOUT); } /** + * @param string[] $allAnalysedFiles * @param Closure(int, list=): void|null $postFileCallback * @param (callable(list, list, string[]): void)|null $onFileAnalysisHandler * @return PromiseInterface @@ -65,6 +68,7 @@ public function __construct( public function analyse( LoopInterface $loop, Schedule $schedule, + array $allAnalysedFiles, string $mainScript, ?Closure $postFileCallback, ?string $projectConfigFile, @@ -170,6 +174,8 @@ public function analyse( $this->processPool->quitAll(); }; + $useFork = $this->forkParallelChecker->isSupported(); + for ($i = 0; $i < $numberOfProcesses; $i++) { if (count($jobs) === 0) { break; @@ -190,13 +196,20 @@ public function analyse( $commandOptions[] = escapeshellarg($insteadOfFile); } - $process = new Process(ProcessHelper::getWorkerCommand( + $process = $this->createProcess( + $useFork, + $loop, + $server, + $serverPort, + $processIdentifier, + $allAnalysedFiles, $mainScript, - 'worker', $projectConfigFile, $commandOptions, + $tmpFile, + $insteadOfFile, $input, - ), $loop, $this->processTimeout); + ); $process->start(function (array $json) use ($process, &$internalErrors, &$errors, &$filteredPhpErrors, &$allPhpErrors, &$locallyIgnoredErrors, &$linesToIgnore, &$unmatchedLineIgnores, &$collectedData, &$dependencies, &$usedTraitDependencies, &$exportedNodes, &$peakMemoryUsages, &$jobs, $postFileCallback, &$internalErrorsCount, &$reachedInternalErrorsCountLimit, $processIdentifier, $onFileAnalysisHandler, &$allProcessedFiles): void { $fileErrors = []; foreach ($json['errors'] as $jsonError) { @@ -355,4 +368,46 @@ public function analyse( return $deferred->promise(); } + /** + * @param string[] $allAnalysedFiles + * @param string[] $commandOptions + */ + private function createProcess( + bool $useFork, + LoopInterface $loop, + TcpServer $server, + int $serverPort, + string $processIdentifier, + array $allAnalysedFiles, + string $mainScript, + ?string $projectConfigFile, + array $commandOptions, + ?string $tmpFile, + ?string $insteadOfFile, + InputInterface $input, + ): Process + { + if ($useFork) { + return new ForkedProcess( + $loop, + $this->processTimeout, + $this->workerRunner, + $server, + $serverPort, + $processIdentifier, + $allAnalysedFiles, + $tmpFile, + $insteadOfFile, + ); + } + + return new SpawnedProcess(ProcessHelper::getWorkerCommand( + $mainScript, + 'worker', + $projectConfigFile, + $commandOptions, + $input, + ), $loop, $this->processTimeout); + } + } diff --git a/src/Parallel/Process.php b/src/Parallel/Process.php index 94d1dacc678..3033c13c059 100644 --- a/src/Parallel/Process.php +++ b/src/Parallel/Process.php @@ -2,151 +2,36 @@ namespace PHPStan\Parallel; -use PHPStan\ShouldNotHappenException; -use React\EventLoop\LoopInterface; -use React\EventLoop\TimerInterface; use React\Stream\ReadableStreamInterface; use React\Stream\WritableStreamInterface; use Throwable; -use function fclose; -use function rewind; -use function sprintf; -use function stream_get_contents; -use function tmpfile; -final class Process +/** + * A parallel analysis worker as seen by ParallelAnalyser / ProcessPool. + * + * Implementations differ only in how the worker process comes to life: + * SpawnedProcess spawns a fresh PHP process via react/child-process, + * ForkedProcess forks the already-booted main process via pcntl_fork(). Both + * then speak the same TCP + NDJSON protocol, so request()/quit()/ + * bindConnection() behave identically and live in ProcessBase. + */ +interface Process { - private \React\ChildProcess\Process $process; - - private ?WritableStreamInterface $in = null; - - /** @var resource */ - private $stdOut; - - /** @var resource */ - private $stdErr; - - /** @var callable(mixed[] $json) : void */ - private $onData; - - /** @var callable(Throwable $exception): void */ - private $onError; - - private ?TimerInterface $timer = null; - - public function __construct( - private string $command, - private LoopInterface $loop, - private float $timeoutSeconds, - ) - { - } - /** * @param callable(mixed[] $json) : void $onData * @param callable(Throwable $exception): void $onError * @param callable(?int $exitCode, string $output) : void $onExit */ - public function start(callable $onData, callable $onError, callable $onExit): void - { - $tmpStdOut = tmpfile(); - if ($tmpStdOut === false) { - throw new ShouldNotHappenException('Failed creating temp file for stdout.'); - } - $tmpStdErr = tmpfile(); - if ($tmpStdErr === false) { - throw new ShouldNotHappenException('Failed creating temp file for stderr.'); - } - $this->stdOut = $tmpStdOut; - $this->stdErr = $tmpStdErr; - $this->process = new \React\ChildProcess\Process($this->command, fds: [ - 1 => $this->stdOut, - 2 => $this->stdErr, - ]); - $this->process->start($this->loop); - $this->onData = $onData; - $this->onError = $onError; - $this->process->on('exit', function ($exitCode) use ($onExit): void { - $this->cancelTimer(); - - $output = ''; - rewind($this->stdOut); - $output .= stream_get_contents($this->stdOut); - - rewind($this->stdErr); - $output .= stream_get_contents($this->stdErr); - - $onExit($exitCode, $output); - fclose($this->stdOut); - fclose($this->stdErr); - }); - } - - private function cancelTimer(): void - { - if ($this->timer === null) { - return; - } - - $this->loop->cancelTimer($this->timer); - $this->timer = null; - } + public function start(callable $onData, callable $onError, callable $onExit): void; /** * @param mixed[] $data */ - public function request(array $data): void - { - $this->cancelTimer(); - if ($this->in === null) { - throw new ShouldNotHappenException(); - } - $this->in->write($data); - $this->timer = $this->loop->addTimer($this->timeoutSeconds, function (): void { - $onError = $this->onError; - $onError(new ProcessTimedOutException(sprintf('Child process timed out after %.1f seconds. Try making it longer with parallel.processTimeout setting.', $this->timeoutSeconds))); - }); - } - - public function quit(): void - { - $this->cancelTimer(); - if (!$this->process->isRunning()) { - return; - } - - foreach ($this->process->pipes as $pipe) { - $pipe->close(); - } - - if ($this->in === null) { - return; - } - - $this->in->end(); - } + public function request(array $data): void; - public function bindConnection(ReadableStreamInterface $out, WritableStreamInterface $in): void - { - $out->on('data', function (array $json): void { - $this->cancelTimer(); - if ($json['action'] !== 'result') { - return; - } + public function quit(): void; - $onData = $this->onData; - $onData($json['result']); - }); - $this->in = $in; - $out->on('error', function (Throwable $error): void { - $onError = $this->onError; - $onError($error); - }); - $in->on('error', function (Throwable $error): void { - $onError = $this->onError; - $onError($error); - }); - } + public function bindConnection(ReadableStreamInterface $out, WritableStreamInterface $in): void; } diff --git a/src/Parallel/ProcessBase.php b/src/Parallel/ProcessBase.php new file mode 100644 index 00000000000..d623f26a25c --- /dev/null +++ b/src/Parallel/ProcessBase.php @@ -0,0 +1,107 @@ +onData = $onData; + $this->onError = $onError; + } + + protected function cancelTimer(): void + { + if ($this->timer === null) { + return; + } + + $this->loop->cancelTimer($this->timer); + $this->timer = null; + } + + /** Cancels the timeout timer and ends the writable side of the connection. */ + protected function endConnection(): void + { + $this->cancelTimer(); + if ($this->in === null) { + return; + } + + $this->in->end(); + } + + /** + * @param mixed[] $data + */ + public function request(array $data): void + { + $this->cancelTimer(); + if ($this->in === null) { + throw new ShouldNotHappenException(); + } + $this->in->write($data); + $this->timer = $this->loop->addTimer($this->timeoutSeconds, function (): void { + $onError = $this->onError; + $onError(new ProcessTimedOutException(sprintf('Child process timed out after %.1f seconds. Try making it longer with parallel.processTimeout setting.', $this->timeoutSeconds))); + }); + } + + public function bindConnection(ReadableStreamInterface $out, WritableStreamInterface $in): void + { + $out->on('data', function (array $json): void { + $this->cancelTimer(); + if ($json['action'] !== 'result') { + return; + } + + $onData = $this->onData; + $onData($json['result']); + }); + $this->in = $in; + $out->on('error', function (Throwable $error): void { + $onError = $this->onError; + $onError($error); + }); + $in->on('error', function (Throwable $error): void { + $onError = $this->onError; + $onError($error); + }); + } + +} diff --git a/src/Parallel/SpawnedProcess.php b/src/Parallel/SpawnedProcess.php new file mode 100644 index 00000000000..a7a1954f52e --- /dev/null +++ b/src/Parallel/SpawnedProcess.php @@ -0,0 +1,93 @@ +stdOut = $tmpStdOut; + $this->stdErr = $tmpStdErr; + $this->process = new Process($this->command, fds: [ + 1 => $this->stdOut, + 2 => $this->stdErr, + ]); + $this->process->start($this->loop); + $this->setCallbacks($onData, $onError); + $this->process->on('exit', function ($exitCode) use ($onExit): void { + $this->cancelTimer(); + + $output = ''; + rewind($this->stdOut); + $output .= stream_get_contents($this->stdOut); + + rewind($this->stdErr); + $output .= stream_get_contents($this->stdErr); + + $onExit($exitCode, $output); + fclose($this->stdOut); + fclose($this->stdErr); + }); + } + + public function quit(): void + { + $this->cancelTimer(); + if (!$this->process->isRunning()) { + return; + } + + foreach ($this->process->pipes as $pipe) { + $pipe->close(); + } + + $this->endConnection(); + } + +} diff --git a/src/Parallel/WorkerRunner.php b/src/Parallel/WorkerRunner.php new file mode 100644 index 00000000000..6cb597dc2d8 --- /dev/null +++ b/src/Parallel/WorkerRunner.php @@ -0,0 +1,247 @@ +switchTmpFile($analysedFiles, $insteadOfFile, $tmpFile); + $this->nodeScopeResolver->setAnalysedFiles($analysedFiles); + $analysedFiles = array_fill_keys($analysedFiles, true); + + // Always a fresh event loop: in a forked child the parent's inherited + // loop must never be touched. + $loop = new StreamSelectLoop(); + $errorCount = 0; + + $tcpConnector = new TcpConnector($loop); + $tcpConnector->connect(sprintf('127.0.0.1:%d', $port))->then(function (ConnectionInterface $connection) use ($identifier, $output, $analysedFiles, $tmpFile, $insteadOfFile, &$errorCount): void { + // phpcs:disable SlevomatCodingStandard.Namespaces.ReferenceUsedNamesOnly + $jsonInvalidUtf8Ignore = defined('JSON_INVALID_UTF8_IGNORE') ? JSON_INVALID_UTF8_IGNORE : 0; + // phpcs:enable + $out = new Encoder($connection, $jsonInvalidUtf8Ignore); + $in = new Decoder($connection, true, options: $jsonInvalidUtf8Ignore, maxlength: $this->decoderBufferSize); + $out->write(['action' => 'hello', 'identifier' => $identifier]); + $this->runWorker($out, $in, $output, $analysedFiles, $tmpFile, $insteadOfFile, $errorCount); + }); + + $loop->run(); + + return $errorCount > 0 ? 1 : 0; + } + + /** + * @param array $analysedFiles + */ + private function runWorker( + WritableStreamInterface $out, + ReadableStreamInterface $in, + OutputInterface $output, + array $analysedFiles, + ?string $tmpFile, + ?string $insteadOfFile, + int &$errorCount, + ): void + { + $handleError = static function (Throwable $error) use ($out, $output, &$errorCount): void { + $errorCount++; + $output->writeln(sprintf('Error: %s', $error->getMessage())); + $out->write([ + 'action' => 'result', + 'result' => [ + 'errors' => [], + 'internalErrors' => [ + new InternalError( + $error->getMessage(), + 'communicating with main process in parallel worker', + InternalError::prepareTrace($error), + $error->getTraceAsString(), + shouldReportBug: true, + ), + ], + 'filteredPhpErrors' => [], + 'allPhpErrors' => [], + 'locallyIgnoredErrors' => [], + 'linesToIgnore' => [], + 'unmatchedLineIgnores' => [], + 'collectedData' => [], + 'memoryUsage' => memory_get_peak_usage(true), + 'dependencies' => [], + 'exportedNodes' => [], + 'files' => [], + 'internalErrorsCount' => 1, + ], + ]); + $out->end(); + }; + $out->on('error', $handleError); + $fileAnalyser = $this->fileAnalyser; + $ruleRegistry = $this->ruleRegistry; + $collectorRegistry = $this->collectorRegistry; + $in->on('data', static function (array $json) use ($fileAnalyser, $ruleRegistry, $collectorRegistry, $out, $analysedFiles, $tmpFile, $insteadOfFile): void { + $action = $json['action']; + if ($action !== 'analyse') { + return; + } + + $internalErrorsCount = 0; + $files = $json['files']; + $errors = []; + $internalErrors = []; + $filteredPhpErrors = []; + $allPhpErrors = []; + $locallyIgnoredErrors = []; + $linesToIgnore = []; + $unmatchedLineIgnores = []; + $collectedData = []; + $dependencies = []; + $usedTraitDependencies = []; + $exportedNodes = []; + $processedFiles = []; + foreach ($files as $file) { + try { + if ($file === $insteadOfFile) { + $file = $tmpFile; + } + $fileAnalyserResult = $fileAnalyser->analyseFile($file, $analysedFiles, $ruleRegistry, $collectorRegistry, null); + $fileErrors = $fileAnalyserResult->getErrors(); + $filteredPhpErrors = array_merge($filteredPhpErrors, $fileAnalyserResult->getFilteredPhpErrors()); + $allPhpErrors = array_merge($allPhpErrors, $fileAnalyserResult->getAllPhpErrors()); + $linesToIgnore[$file] = $fileAnalyserResult->getLinesToIgnore(); + $unmatchedLineIgnores[$file] = $fileAnalyserResult->getUnmatchedLineIgnores(); + $dependencies[$file] = $fileAnalyserResult->getDependencies(); + $usedTraitDependencies[$file] = $fileAnalyserResult->getUsedTraitDependencies(); + $exportedNodes[$file] = $fileAnalyserResult->getExportedNodes(); + $processedFiles = array_merge($processedFiles, $fileAnalyserResult->getProcessedFiles()); + foreach ($fileErrors as $fileError) { + $errors[] = $fileError; + } + foreach ($fileAnalyserResult->getLocallyIgnoredErrors() as $locallyIgnoredError) { + $locallyIgnoredErrors[] = $locallyIgnoredError; + } + foreach ($fileAnalyserResult->getCollectedData() as $collectedFile => $dataPerCollector) { + foreach ($dataPerCollector as $collectorType => $collectorData) { + foreach ($collectorData as $data) { + $collectedData[$collectedFile][$collectorType][] = $data; + } + } + } + } catch (Throwable $t) { + $internalErrorsCount++; + $internalErrors[] = new InternalError( + $t->getMessage(), + sprintf('analysing file %s', $file), + InternalError::prepareTrace($t), + $t->getTraceAsString(), + shouldReportBug: true, + ); + } + } + + $out->write([ + 'action' => 'result', + 'result' => [ + 'errors' => $errors, + 'internalErrors' => $internalErrors, + 'filteredPhpErrors' => $filteredPhpErrors, + 'allPhpErrors' => $allPhpErrors, + 'locallyIgnoredErrors' => $locallyIgnoredErrors, + 'linesToIgnore' => $linesToIgnore, + 'unmatchedLineIgnores' => $unmatchedLineIgnores, + 'collectedData' => $collectedData, + 'memoryUsage' => memory_get_peak_usage(true), + 'dependencies' => $dependencies, + 'usedTraitDependencies' => $usedTraitDependencies, + 'exportedNodes' => $exportedNodes, + 'files' => $files, + 'processedFiles' => $processedFiles, + 'internalErrorsCount' => $internalErrorsCount, + ]]); + }); + $in->on('error', $handleError); + } + + /** + * @param string[] $analysedFiles + * @return string[] + */ + private function switchTmpFile( + array $analysedFiles, + ?string $insteadOfFile, + ?string $tmpFile, + ): array + { + if ($insteadOfFile === null) { + return $analysedFiles; + } + $analysedFiles = array_values(array_filter($analysedFiles, static fn (string $file): bool => $file !== $insteadOfFile)); + + if ($tmpFile !== null) { + array_unshift($analysedFiles, $tmpFile); + } + + return $analysedFiles; + } + +} diff --git a/src/Process/ForkedProcessPromise.php b/src/Process/ForkedProcessPromise.php new file mode 100644 index 00000000000..8381b8acbeb --- /dev/null +++ b/src/Process/ForkedProcessPromise.php @@ -0,0 +1,179 @@ + */ + private Deferred $deferred; + + private ?int $childPid = null; + + /** @var resource|null */ + private $stdOut = null; + + private ?TimerInterface $waitTimer = null; + + private bool $canceled = false; + + /** + * @param string[] $inceptionFiles + * @param mixed[]|null $projectConfigArray + */ + public function __construct( + private LoopInterface $loop, + private FixerWorkerRunner $fixerWorkerRunner, + private TcpServer $server, + private Output $errorOutput, + private array $inceptionFiles, + private bool $isOnlyFiles, + private ?array $projectConfigArray, + private ?string $configuration, + private int $serverPort, + private InputInterface $input, + ) + { + $this->deferred = new Deferred(function (): void { + $this->cancel(); + }); + } + + /** + * @return PromiseInterface + */ + public function run(): PromiseInterface + { + // Created before the fork so the parent can read what the child wrote. + $tmpStdOut = tmpfile(); + if ($tmpStdOut === false) { + throw new ShouldNotHappenException('Failed creating temp file for stdout.'); + } + $this->stdOut = $tmpStdOut; + + $pid = pcntl_fork(); + + if ($pid === -1) { + fclose($this->stdOut); + $this->stdOut = null; + // Deferred so it runs after FixerApplication has stored the promise. + $this->loop->futureTick(function (): void { + $this->deferred->reject(new ProcessCrashedException('pcntl_fork() failed.')); + }); + + return $this->deferred->promise(); + } + + if ($pid === 0) { + // Child: drop the inherited listening socket immediately, then run + // the worker on its own fresh event loop and never return. + $this->server->close(); + $exitCode = $this->fixerWorkerRunner->run( + $this->errorOutput, + $this->inceptionFiles, + $this->isOnlyFiles, + $this->projectConfigArray, + $this->configuration, + $this->serverPort, + $this->input, + ); + exit($exitCode); + } + + // Parent: poll for the child to exit and resolve/reject accordingly. + $this->childPid = $pid; + $this->waitTimer = $this->loop->addPeriodicTimer(self::WAITPID_POLL_INTERVAL, function () use ($pid): void { + $status = 0; + $result = pcntl_waitpid($pid, $status, WNOHANG); + if ($result === 0) { + return; + } + + $this->cancelWaitTimer(); + + $output = ''; + if ($this->stdOut !== null) { + rewind($this->stdOut); + $output = (string) stream_get_contents($this->stdOut); + fclose($this->stdOut); + $this->stdOut = null; + } + + if ($this->canceled) { + // cancel() already rejected the promise; just reap the child. + return; + } + + $exitCode = null; + if ($result > 0 && pcntl_wifexited($status)) { + $exitStatus = pcntl_wexitstatus($status); + if ($exitStatus !== false) { + $exitCode = $exitStatus; + } + } + + if ($exitCode === 0) { + $this->deferred->resolve($output); + return; + } + + $this->deferred->reject(new ProcessCrashedException($output)); + }); + + return $this->deferred->promise(); + } + + private function cancel(): void + { + if ($this->childPid === null) { + throw new ShouldNotHappenException('Cancelling process before running'); + } + $this->canceled = true; + // SIGTERM the child; the waitpid poll timer keeps running so it still + // gets reaped (otherwise: zombie). + posix_kill($this->childPid, SIGTERM); + $this->deferred->reject(new ProcessCanceledException()); + } + + private function cancelWaitTimer(): void + { + if ($this->waitTimer === null) { + return; + } + + $this->loop->cancelTimer($this->waitTimer); + $this->waitTimer = null; + } + +} diff --git a/src/Process/ProcessPromise.php b/src/Process/ProcessPromise.php index 31c9f0b7615..1d588c42073 100644 --- a/src/Process/ProcessPromise.php +++ b/src/Process/ProcessPromise.php @@ -2,91 +2,23 @@ namespace PHPStan\Process; -use PHPStan\ShouldNotHappenException; -use React\ChildProcess\Process; -use React\EventLoop\LoopInterface; -use React\Promise\Deferred; use React\Promise\PromiseInterface; -use function fclose; -use function rewind; -use function stream_get_contents; -use function tmpfile; -final class ProcessPromise +/** + * A PHPStan Pro analysis worker as seen by FixerApplication. + * + * Implementations differ only in how the worker process comes to life: + * SpawnedProcessPromise spawns a fresh PHP process via react/child-process, + * ForkedProcessPromise forks the already-booted main process via pcntl_fork(). + * Both yield a promise that resolves on success and rejects with + * ProcessCrashedException / ProcessCanceledException otherwise. + */ +interface ProcessPromise { - /** @var Deferred */ - private Deferred $deferred; - - private ?Process $process = null; - - private bool $canceled = false; - - public function __construct(private LoopInterface $loop, private string $command) - { - $this->deferred = new Deferred(function (): void { - $this->cancel(); - }); - } - /** * @return PromiseInterface */ - public function run(): PromiseInterface - { - $tmpStdOutResource = tmpfile(); - if ($tmpStdOutResource === false) { - throw new ShouldNotHappenException('Failed creating temp file for stdout.'); - } - $tmpStdErrResource = tmpfile(); - if ($tmpStdErrResource === false) { - throw new ShouldNotHappenException('Failed creating temp file for stderr.'); - } - - $this->process = new Process($this->command, fds: [ - 1 => $tmpStdOutResource, - 2 => $tmpStdErrResource, - ]); - $this->process->start($this->loop); - - $this->process->on('exit', function ($exitCode) use ($tmpStdOutResource, $tmpStdErrResource): void { - if ($this->canceled) { - fclose($tmpStdOutResource); - fclose($tmpStdErrResource); - return; - } - rewind($tmpStdOutResource); - $stdOut = stream_get_contents($tmpStdOutResource); - fclose($tmpStdOutResource); - - rewind($tmpStdErrResource); - $stdErr = stream_get_contents($tmpStdErrResource); - fclose($tmpStdErrResource); - - if ($exitCode === null) { - $this->deferred->reject(new ProcessCrashedException($stdOut . $stdErr)); - return; - } - - if ($exitCode === 0) { - $this->deferred->resolve($stdOut); - return; - } - - $this->deferred->reject(new ProcessCrashedException($stdOut . $stdErr)); - }); - - return $this->deferred->promise(); - } - - private function cancel(): void - { - if ($this->process === null) { - throw new ShouldNotHappenException('Cancelling process before running'); - } - $this->canceled = true; - $this->process->terminate(); - $this->deferred->reject(new ProcessCanceledException()); - } + public function run(): PromiseInterface; } diff --git a/src/Process/SpawnedProcessPromise.php b/src/Process/SpawnedProcessPromise.php new file mode 100644 index 00000000000..4504e4161d0 --- /dev/null +++ b/src/Process/SpawnedProcessPromise.php @@ -0,0 +1,98 @@ + */ + private Deferred $deferred; + + private ?Process $process = null; + + private bool $canceled = false; + + public function __construct(private LoopInterface $loop, private string $command) + { + $this->deferred = new Deferred(function (): void { + $this->cancel(); + }); + } + + /** + * @return PromiseInterface + */ + public function run(): PromiseInterface + { + $tmpStdOutResource = tmpfile(); + if ($tmpStdOutResource === false) { + throw new ShouldNotHappenException('Failed creating temp file for stdout.'); + } + $tmpStdErrResource = tmpfile(); + if ($tmpStdErrResource === false) { + throw new ShouldNotHappenException('Failed creating temp file for stderr.'); + } + + $this->process = new Process($this->command, fds: [ + 1 => $tmpStdOutResource, + 2 => $tmpStdErrResource, + ]); + $this->process->start($this->loop); + + $this->process->on('exit', function ($exitCode) use ($tmpStdOutResource, $tmpStdErrResource): void { + if ($this->canceled) { + fclose($tmpStdOutResource); + fclose($tmpStdErrResource); + return; + } + rewind($tmpStdOutResource); + $stdOut = stream_get_contents($tmpStdOutResource); + fclose($tmpStdOutResource); + + rewind($tmpStdErrResource); + $stdErr = stream_get_contents($tmpStdErrResource); + fclose($tmpStdErrResource); + + if ($exitCode === null) { + $this->deferred->reject(new ProcessCrashedException($stdOut . $stdErr)); + return; + } + + if ($exitCode === 0) { + $this->deferred->resolve($stdOut); + return; + } + + $this->deferred->reject(new ProcessCrashedException($stdOut . $stdErr)); + }); + + return $this->deferred->promise(); + } + + private function cancel(): void + { + if ($this->process === null) { + throw new ShouldNotHappenException('Cancelling process before running'); + } + $this->canceled = true; + $this->process->terminate(); + $this->deferred->reject(new ProcessCanceledException()); + } + +}