file level parallelization of the dataflow analysis#2088
Draft
EagleoutIce wants to merge 74 commits intomainfrom
Draft
file level parallelization of the dataflow analysis#2088EagleoutIce wants to merge 74 commits intomainfrom
EagleoutIce wants to merge 74 commits intomainfrom
Conversation
worker.ts: handles each tasks by calling the appropriate workerTasks task-registry.ts: contains all definitions for the workerTasks, used by worker.ts threadpool.ts: wrapper for tinypool, handles dispatching tasks and creating / deleting the worker threads
basic dispatch of all files to analyze to threadpool. currently fails, because path to worker file is not correctly resolved diff --git a/src/dataflow/extractor.ts b/src/dataflow/extractor.ts index 34dfe00..83f6be3 100644 --- a/src/dataflow/extractor.ts +++ b/src/dataflow/extractor.ts @@ -26,6 +26,8 @@ import type { ControlFlowInformation } from '../control-flow/control-flow-graph' import { getBuiltInDefinitions } from './environments/built-in-config'; import type { FlowrAnalyzerContext } from '../project/context/flowr-analyzer-context'; import { FlowrFile } from '../project/context/flowr-file'; +import { Threadpool } from './parallel/threadpool'; +import { SourceFilePayload } from './parallel/task-registry'; /** * The best friend of {@link produceDataFlowGraph} and {@link processDataflowFor}. @@ -118,6 +120,20 @@ export function produceDataFlowGraph<OtherInfo>( }; let df = processDataflowFor<OtherInfo>(files[0].root, dfData); + // first call with threadpool + const pool = new Threadpool(); + + // submit all files + const result = pool.submitTasks<SourceFilePayload<OtherInfo>, void>( + "testPool", + files.map((file, i) => ({ + index: i, + file, + data: dfData, + dataflowInfo: df, + })) + ) + for(let i = 1; i < files.length; i++) { /* source requests register automatically */ df = standaloneSourceFile(i, files[i], dfData, df);
threadpool.ts: contains piscina wrapper > now handles worker creation with MessagePorts correctly, tasks can submit more tasks into the queue worker.ts: actual worker file for threadpool > handles port registration to main thread, chooses appropriate handler for tasks, handles subtask submission and collection task-registry.ts: handler definitions for tasks > contains relevant types and interfaces, specifies each handler for a given task extractor.ts: dataflow extractor > modified to dispatch dummy call for all files, threadpool creation currently for each call -> needs to be moved out
- extractor.ts: aggregates all dataflow information and the merges them back together via reduction - built-in-source.ts: dataflow merging is now in seperate function
- feature-def.ts: contains all features and their default value - feature-manager.ts: exposes functionality for setting and checking the feature flags
worker.ts: handles each tasks by calling the appropriate workerTasks task-registry.ts: contains all definitions for the workerTasks, used by worker.ts threadpool.ts: wrapper for tinypool, handles dispatching tasks and creating / deleting the worker threads
basic dispatch of all files to analyze to threadpool. currently fails, because path to worker file is not correctly resolved diff --git a/src/dataflow/extractor.ts b/src/dataflow/extractor.ts index 34dfe00..83f6be3 100644 --- a/src/dataflow/extractor.ts +++ b/src/dataflow/extractor.ts @@ -26,6 +26,8 @@ import type { ControlFlowInformation } from '../control-flow/control-flow-graph' import { getBuiltInDefinitions } from './environments/built-in-config'; import type { FlowrAnalyzerContext } from '../project/context/flowr-analyzer-context'; import { FlowrFile } from '../project/context/flowr-file'; +import { Threadpool } from './parallel/threadpool'; +import { SourceFilePayload } from './parallel/task-registry'; /** * The best friend of {@link produceDataFlowGraph} and {@link processDataflowFor}. @@ -118,6 +120,20 @@ export function produceDataFlowGraph<OtherInfo>( }; let df = processDataflowFor<OtherInfo>(files[0].root, dfData); + // first call with threadpool + const pool = new Threadpool(); + + // submit all files + const result = pool.submitTasks<SourceFilePayload<OtherInfo>, void>( + "testPool", + files.map((file, i) => ({ + index: i, + file, + data: dfData, + dataflowInfo: df, + })) + ) + for(let i = 1; i < files.length; i++) { /* source requests register automatically */ df = standaloneSourceFile(i, files[i], dfData, df);
threadpool.ts: contains piscina wrapper > now handles worker creation with MessagePorts correctly, tasks can submit more tasks into the queue worker.ts: actual worker file for threadpool > handles port registration to main thread, chooses appropriate handler for tasks, handles subtask submission and collection task-registry.ts: handler definitions for tasks > contains relevant types and interfaces, specifies each handler for a given task extractor.ts: dataflow extractor > modified to dispatch dummy call for all files, threadpool creation currently for each call -> needs to be moved out
- extractor.ts: aggregates all dataflow information and the merges them back together via reduction - built-in-source.ts: dataflow merging is now in seperate function
- feature-def.ts: contains all features and their default value - feature-manager.ts: exposes functionality for setting and checking the feature flags
…s' of github.com:flowr-analysis/flowr into 2042-file-level-parallelization-of-the-dataflow-analysis
now uses the threadId provided by piscina more logging statements <- remove later diff --git a/src/dataflow/parallel/worker.ts b/src/dataflow/parallel/worker.ts index 5fd490b..bb0bba2 100644 --- a/src/dataflow/parallel/worker.ts +++ b/src/dataflow/parallel/worker.ts @@ -1,4 +1,4 @@ -import { parentPort, MessageChannel, workerData } from 'node:worker_threads'; +import { parentPort, MessageChannel, workerData, threadId } from 'node:worker_threads'; import type { TaskName } from './task-registry'; import { workerTasks } from './task-registry'; import type { SubtaskReceivedMessage } from './threadpool'; @@ -15,27 +15,27 @@ const pending = new Map< PendingEntry<unknown> >(); + const { port1: workerPort, port2: mainPort } = new MessageChannel(); if(!parentPort){ dataflowLogger.error('Worker started without parentPort present, Aborting worker'); } else { + //console.log(`Worker ${workerData.workerId} registering port to main thread.`); + console.log(threadId); parentPort.postMessage({ type: 'register-port', - workerId: typeof workerData === 'object' && - workerData !== null && - typeof (workerData as { id?: number }).id === 'number' - ? (workerData as { id: number }).id : Math.floor(Math.random() * 1e9), + workerId: threadId, port: mainPort, }, [mainPort] // transfer port to main thread ); } - workerPort.on('message', (msg: unknown) => { if(isSubtaskResponseMessage(msg)){ const { id, result, error } = msg; + console.log(`got response for ${id}`); const entry = pending.get(id); if(!entry) { return; @@ -59,7 +59,7 @@ async function runSubtask<TInput, TOutput>(taskName: TaskName, taskPayload: TInp //return undefined as unknown as TOutput; return new Promise((resolve, reject) => { pending.set(id, { resolve: resolve as (value: unknown) => void, reject }); - + console.log(`submitting subtask with ${id} from ${threadId}`); // submit the subtask to main thread workerPort.postMessage({ type: 'subtask',
- fix for broken worker path - fixed subtask resolution - allowed deferred dataflow merge
- definitions for clonable dataflow data - example usage of clonable data
- use workerWrapper to load and register ts-node for the worker file
threadpool now waits for worker initialization to conclude
feature manager is now a class and not an global object
integrated the feature manager into the flowrAnalyzerBuilder and flowrAnalyzer.
close the analyzer after each test, but without shutting down the parser backend
worker tracks internal state and returns the data with each successfull task -> best effort workerpool tracks own internal state and worker state with best effort approach
workerpool: tests for any kind of memory or message port leak -> will certainly break if run in parallel parallel-dataflow: tests if the output is equivalent to sequential analysis
any pool messages and type checking helpers were moved to own file for clarity
tests are now grouped into simple test suites of files. Testrunner marks the execution of each test.
graphs had false labels in test
post merge now correctly discrads builtin references, which can be resolved by the merged env and hold no other user defined vars -> it is a placeholder
- new builtin redef monitoring - fallback to sequential analysis - slightly improved linking stage - new test cases - split tests into new files - new test suites
- restructured tests into seperate suites - added new test cases
- updated tests to pass - current failing tests are marked
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.