From ca17e9a9ae507081340ef1a47b28308fcf73413c Mon Sep 17 00:00:00 2001 From: jbirddog Date: Mon, 27 Apr 2026 17:18:32 +0000 Subject: [PATCH] Make BPMN waiting refresh internal and lazy refresh_waiting_tasks() forced consumers to understand when BPMN internals needed WAITING tasks refreshed. Because the method scanned waiting tasks and timers, consumers had to call it defensively, making it both a leaky abstraction and a performance trap for hot-path runners. Keep the public BpmnWorkflow.refresh_waiting_tasks() method as a compatibility no-op and move refresh behavior into engine-owned paths: index WAITING tasks as states change, prune removed tasks, refresh structural waits from task completion notifications, refresh matched event catches directly, schedule timer waits by due time, and rebuild the private index after deserialization. Add generated BPMN stress fixtures and a Makefile target for repeatable measurement: make RUN='uv run' waiting-task-stress. Stress results with 100 dormant timers and 100 ready hot-path steps: ready hot path dropped from 10,300 to 100 timer checks (103x fewer), and staggered timers dropped from 1,900 to 10 timer checks (190x fewer). Final stress elapsed times were about 0.092s and 0.0044s respectively. Verified with: make RUN='uv run' tests; make RUN='uv run' waiting-task-stress. --- Makefile | 4 + .../bpmn/serializer/default/workflow.py | 1 + SpiffWorkflow/bpmn/util/subworkflow.py | 7 +- SpiffWorkflow/bpmn/workflow.py | 166 ++++++++++++++++-- SpiffWorkflow/task.py | 3 + SpiffWorkflow/workflow.py | 2 + .../bpmn/WaitingTaskStressBenchmark.py | 141 +++++++++++++++ .../bpmn/WaitingTaskStressTest.py | 110 ++++++++++++ .../bpmn/events/TimerCycleTest.py | 2 +- .../bpmn/events/TimerDurationTest.py | 2 +- .../bpmn/events/TimerIntermediateTest.py | 5 +- .../SpiffWorkflow/bpmn/waiting_task_stress.py | 152 ++++++++++++++++ 12 files changed, 572 insertions(+), 23 deletions(-) create mode 100644 tests/SpiffWorkflow/bpmn/WaitingTaskStressBenchmark.py create mode 100644 tests/SpiffWorkflow/bpmn/WaitingTaskStressTest.py create mode 100644 tests/SpiffWorkflow/bpmn/waiting_task_stress.py diff --git a/Makefile b/Makefile index 4300d45b..4e8619db 100644 --- a/Makefile +++ b/Makefile @@ -59,6 +59,10 @@ tests-ind: tests-timing: @make tests-ind 2>&1 | ./scripts/test_times.py +.PHONY : waiting-task-stress +waiting-task-stress: + $(RUN) python -m unittest -v tests.SpiffWorkflow.bpmn.WaitingTaskStressBenchmark + wheel: clean $(RUN) python -m build --sdist --wheel --outdir dist/ diff --git a/SpiffWorkflow/bpmn/serializer/default/workflow.py b/SpiffWorkflow/bpmn/serializer/default/workflow.py index d2c34c16..9284c1d1 100644 --- a/SpiffWorkflow/bpmn/serializer/default/workflow.py +++ b/SpiffWorkflow/bpmn/serializer/default/workflow.py @@ -201,6 +201,7 @@ def from_dict(self, dct): # Handle the remaining top workflow attributes self.subprocesses_from_dict(dct['subprocesses'], workflow) workflow.bpmn_events = self.registry.restore(dct.pop('bpmn_events', [])) + workflow._rebuild_waiting_task_index() return workflow diff --git a/SpiffWorkflow/bpmn/util/subworkflow.py b/SpiffWorkflow/bpmn/util/subworkflow.py index 6968a1f1..1f60e7dc 100644 --- a/SpiffWorkflow/bpmn/util/subworkflow.py +++ b/SpiffWorkflow/bpmn/util/subworkflow.py @@ -35,6 +35,12 @@ def data_objects(self): def get_tasks_iterator(self, first_task=None, **kwargs): return BpmnTaskIterator(first_task or self.task_tree, **kwargs) + def update_waiting_tasks(self): + self.top_workflow._refresh_internal_waiting_tasks() + + def _task_state_changed_notify(self, task, old_state, new_state): + self.top_workflow._waiting_task_state_changed(task, old_state, new_state) + class BpmnSubWorkflow(BpmnBaseWorkflow): @@ -68,4 +74,3 @@ def collect_log_extras(self, dct=None): dct = super().collect_log_extras() dct.update({'parent_task_id': self.parent_task_id}) return dct - diff --git a/SpiffWorkflow/bpmn/workflow.py b/SpiffWorkflow/bpmn/workflow.py index 96015265..476b7819 100644 --- a/SpiffWorkflow/bpmn/workflow.py +++ b/SpiffWorkflow/bpmn/workflow.py @@ -17,6 +17,9 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301 USA +import heapq +from datetime import datetime, timezone + from SpiffWorkflow.task import Task from SpiffWorkflow.util.task import TaskState from SpiffWorkflow.exceptions import WorkflowException @@ -24,6 +27,8 @@ from SpiffWorkflow.bpmn.specs.mixins.events.event_types import CatchingEvent from SpiffWorkflow.bpmn.specs.mixins.events.start_event import StartEvent from SpiffWorkflow.bpmn.specs.mixins.subworkflow_task import CallActivity +from SpiffWorkflow.bpmn.specs.event_definitions.multiple import MultipleEventDefinition +from SpiffWorkflow.bpmn.specs.event_definitions.timer import TimerEventDefinition from SpiffWorkflow.bpmn.specs.event_definitions.item_aware_event import CodeEventDefinition from SpiffWorkflow.bpmn.specs.control import BoundaryEventSplit @@ -33,6 +38,98 @@ from .script_engine.python_engine import PythonScriptEngine +class _WaitingTaskIndex: + + def __init__(self): + self.waiting_tasks = {} + self.timer_tasks = {} + self.timer_due_at = {} + self.timer_heap = [] + self._sequence = 0 + + def task_state_changed(self, task, old_state, new_state): + if old_state == TaskState.WAITING: + self._remove(task) + if new_state == TaskState.WAITING: + self._add(task) + + def refresh_internal_tasks(self, refresh_task): + for task in list(self.waiting_tasks.values()): + if task.id not in self.timer_tasks and task.state == TaskState.WAITING: + refresh_task(task) + self.refresh_due_timers(refresh_task) + + def refresh_due_timers(self, refresh_task): + self._schedule_missing_timer_due_times(refresh_task) + now = datetime.now(timezone.utc).timestamp() + while self.timer_heap and self.timer_heap[0][0] <= now: + due_at, _sequence, task_id = heapq.heappop(self.timer_heap) + if self.timer_due_at.get(task_id) != due_at: + continue + task = self.timer_tasks.get(task_id) + if task is None or task.state != TaskState.WAITING: + continue + refresh_task(task) + + def refresh_tasks(self, tasks, refresh_task): + for task in tasks: + refresh_task(task) + + def reschedule_timer(self, task): + if task.id in self.timer_tasks: + self._schedule_timer(task) + + def _add(self, task): + self.waiting_tasks[task.id] = task + if self._is_timer_task(task): + self.timer_tasks[task.id] = task + self._schedule_timer(task) + + def _remove(self, task): + self.waiting_tasks.pop(task.id, None) + self.timer_tasks.pop(task.id, None) + self.timer_due_at.pop(task.id, None) + + def _schedule_missing_timer_due_times(self, refresh_task): + for task in list(self.timer_tasks.values()): + if task.state != TaskState.WAITING: + continue + if task.id not in self.timer_due_at: + refresh_task(task) + + def _schedule_timer(self, task): + due_at = self._get_timer_due_at(task) + if due_at is None: + self.timer_due_at.pop(task.id, None) + return + self.timer_due_at[task.id] = due_at + self._sequence += 1 + heapq.heappush(self.timer_heap, (due_at, self._sequence, task.id)) + + def _is_timer_task(self, task): + return isinstance(task.task_spec, CatchingEvent) and self._has_timer_definition(task.task_spec.event_definition) + + def _has_timer_definition(self, event_definition): + if isinstance(event_definition, TimerEventDefinition): + return True + if isinstance(event_definition, MultipleEventDefinition): + return any(self._has_timer_definition(definition) for definition in event_definition.event_definitions) + return False + + def _get_timer_due_at(self, task): + event_value = task._get_internal_data('event_value') + if event_value is None: + return None + if isinstance(event_value, dict): + if event_value.get('cycles') == 0: + return 0 + next_event = event_value.get('next') + if next_event is None: + return None + return TimerEventDefinition.get_datetime(next_event).timestamp() + return TimerEventDefinition.get_datetime(event_value).timestamp() + + class BpmnWorkflow(BpmnBaseWorkflow): """ The engine that executes a BPMN workflow. This specialises the standard @@ -51,6 +148,8 @@ def __init__(self, spec, subprocess_specs=None, script_engine=None, **kwargs): self.subprocesses = {} self.bpmn_events = [] self.correlations = {} + self._waiting_task_index = _WaitingTaskIndex() + self._refreshing_waiting_tasks = False super(BpmnWorkflow, self).__init__(spec, **kwargs) for obj in self.spec.data_objects: @@ -129,7 +228,7 @@ def catch(self, event): for task in tasks: task.task_spec.catch(task, event) if len(tasks) > 0: - self.refresh_waiting_tasks() + self._refresh_caught_tasks(tasks) def send_event(self, event): """Allows this workflow to catch an externally generated event.""" @@ -142,7 +241,7 @@ def send_event(self, event): raise WorkflowException(f"This process is not waiting for {event.event_definition.name}") for task in tasks: task.task_spec.catch(task, event) - self.refresh_waiting_tasks() + self._refresh_caught_tasks(tasks) def get_events(self): """Returns the list of events that cannot be handled from within this workflow.""" @@ -164,8 +263,10 @@ def do_engine_steps(self, will_complete_task=None, did_complete_task=None): :param will_complete_task: Callback that will be called prior to completing a task :param did_complete_task: Callback that will be called after completing a task """ + self._refresh_due_waiting_tasks() count = self._do_engine_steps(will_complete_task, did_complete_task) while count > 0: + self._refresh_due_waiting_tasks() count = self._do_engine_steps(will_complete_task, did_complete_task) def _do_engine_steps(self, will_complete_task=None, did_complete_task=None): @@ -197,25 +298,58 @@ def update_workflow(wf): def refresh_waiting_tasks(self, will_refresh_task=None, did_refresh_task=None): """ - Refresh the state of all WAITING tasks. This will, for example, update - Catching Timer Events whose waiting time has passed. + Compatibility no-op. + + BPMN workflows now refresh WAITING task internals through engine steps, + targeted event catches, and task completion notifications. :param will_refresh_task: Callback that will be called prior to refreshing a task :param did_refresh_task: Callback that will be called after refreshing a task """ - def update_task(task): - if will_refresh_task is not None: - will_refresh_task(task) + pass + + def _waiting_task_state_changed(self, task, old_state, new_state): + self._waiting_task_index.task_state_changed(task, old_state, new_state) + + def _rebuild_waiting_task_index(self): + self._waiting_task_index = _WaitingTaskIndex() + workflows = [self] + list(self.subprocesses.values()) + for workflow in workflows: + for task in workflow.tasks.values(): + if task.state == TaskState.WAITING: + self._waiting_task_index.task_state_changed(task, None, TaskState.WAITING) + + def _refresh_internal_waiting_tasks(self): + if self._refreshing_waiting_tasks: + return + self._refreshing_waiting_tasks = True + try: + self._waiting_task_index.refresh_internal_tasks(self._refresh_waiting_task) + finally: + self._refreshing_waiting_tasks = False + + def _refresh_due_waiting_tasks(self): + if self._refreshing_waiting_tasks: + return + self._refreshing_waiting_tasks = True + try: + self._waiting_task_index.refresh_due_timers(self._refresh_waiting_task) + finally: + self._refreshing_waiting_tasks = False + + def _refresh_caught_tasks(self, tasks): + if self._refreshing_waiting_tasks: + return + self._refreshing_waiting_tasks = True + try: + self._waiting_task_index.refresh_tasks(tasks, self._refresh_waiting_task) + finally: + self._refreshing_waiting_tasks = False + + def _refresh_waiting_task(self, task): + if task.state == TaskState.WAITING: task.task_spec._update(task) - if did_refresh_task is not None: - did_refresh_task(task) - - for subprocess in sorted(self.get_active_subprocesses(), key=lambda v: v.depth, reverse=True): - for task in subprocess.get_tasks_iterator(skip_subprocesses=True, state=TaskState.WAITING): - update_task(task) - - for task in self.get_tasks_iterator(skip_subprocesses=True, state=TaskState.WAITING): - update_task(task) + self._waiting_task_index.reschedule_timer(task) def get_task_from_id(self, task_id): if task_id not in self.tasks: diff --git a/SpiffWorkflow/task.py b/SpiffWorkflow/task.py index 5a1ff2ba..0151adb8 100644 --- a/SpiffWorkflow/task.py +++ b/SpiffWorkflow/task.py @@ -292,9 +292,12 @@ def _set_state(self, value): """Force set the state on a task""" if value != self.state: + old_state = self._state elapsed = time.time() - self.last_state_change self.last_state_change = time.time() self._state = value + if hasattr(self.workflow, '_task_state_changed_notify'): + self.workflow._task_state_changed_notify(self, old_state, value) logger.info( f'State changed to {TaskState.get_name(value)}', extra=self.collect_log_extras({'elapsed': elapsed}) diff --git a/SpiffWorkflow/workflow.py b/SpiffWorkflow/workflow.py index ceaa8d3a..14ce896b 100644 --- a/SpiffWorkflow/workflow.py +++ b/SpiffWorkflow/workflow.py @@ -285,6 +285,8 @@ def _remove_task(self, task_id): task = self.tasks[task_id] for child in task.children: self._remove_task(child.id) + if hasattr(self, '_task_state_changed_notify'): + self._task_state_changed_notify(task, task.state, None) task.parent._children.remove(task.id) self.tasks.pop(task_id) diff --git a/tests/SpiffWorkflow/bpmn/WaitingTaskStressBenchmark.py b/tests/SpiffWorkflow/bpmn/WaitingTaskStressBenchmark.py new file mode 100644 index 00000000..fc18e811 --- /dev/null +++ b/tests/SpiffWorkflow/bpmn/WaitingTaskStressBenchmark.py @@ -0,0 +1,141 @@ +""" +Run with: + make RUN='uv run' waiting-task-stress + +Useful scale knobs: + SPIFF_WAITING_STRESS_TIMERS=500 + SPIFF_WAITING_STRESS_READY_STEPS=500 + SPIFF_WAITING_STRESS_DUE_TIMERS=50 + SPIFF_WAITING_STRESS_FUTURE_TIMERS=450 + +Optional guard for optimized branches: + SPIFF_WAITING_STRESS_MAX_TIMER_CHECKS=500 +""" + +import os +import time +from unittest.mock import patch + +from SpiffWorkflow import TaskState +from SpiffWorkflow.bpmn.specs.event_definitions.timer import DurationTimerEventDefinition + +from .BpmnWorkflowTestCase import BpmnWorkflowTestCase +from .waiting_task_stress import StressBpmnKind, WaitingTaskStressConfig, load_stress_workflow + + +class WaitingTaskStressBenchmark(BpmnWorkflowTestCase): + + def test_ready_hot_path_with_many_dormant_timers(self): + config = WaitingTaskStressConfig( + waiting_timers=_env_int("SPIFF_WAITING_STRESS_TIMERS", 100), + ready_steps=_env_int("SPIFF_WAITING_STRESS_READY_STEPS", 100), + ) + workflow = load_stress_workflow(self, StressBpmnKind.READY_HOT_PATH, config) + + with _count_duration_timer_checks() as counter: + started_at = time.perf_counter() + workflow.do_engine_steps() + elapsed = time.perf_counter() - started_at + + waiting_timers = _tasks_with_bpmn_id_prefix(workflow, TaskState.WAITING, "timer_wait_") + completed_hot_steps = _tasks_with_bpmn_id_prefix(workflow, TaskState.COMPLETED, "hot_step_") + + self.assertEqual(config.waiting_timers, len(waiting_timers)) + self.assertEqual(config.ready_steps, len(completed_hot_steps)) + _print_metrics( + "READY HOT PATH WITH DORMANT TIMERS", + { + "waiting_timers": config.waiting_timers, + "ready_steps": config.ready_steps, + "timer_has_fired_calls": counter.calls, + "elapsed_seconds": f"{elapsed:.6f}", + }, + ) + _assert_optional_max("SPIFF_WAITING_STRESS_MAX_TIMER_CHECKS", counter.calls, self) + + def test_staggered_timers_refresh_cost(self): + due_timers = _env_int("SPIFF_WAITING_STRESS_DUE_TIMERS", 10) + waiting_timers = _env_int("SPIFF_WAITING_STRESS_FUTURE_TIMERS", 90) + config = WaitingTaskStressConfig( + waiting_timers=waiting_timers, + due_timers=due_timers, + due_duration="PT0.01S", + ) + workflow = load_stress_workflow(self, StressBpmnKind.STAGGERED_TIMERS, config) + workflow.do_engine_steps() + + time.sleep(0.02) + with _count_duration_timer_checks() as counter: + started_at = time.perf_counter() + workflow.refresh_waiting_tasks() + workflow.do_engine_steps() + elapsed = time.perf_counter() - started_at + + waiting_timer_tasks = _tasks_with_bpmn_id_prefix(workflow, TaskState.WAITING, "timer_wait_") + completed_timer_tasks = _tasks_with_bpmn_id_prefix(workflow, TaskState.COMPLETED, "timer_wait_") + + self.assertEqual(waiting_timers, len(waiting_timer_tasks)) + self.assertEqual(due_timers, len(completed_timer_tasks)) + _print_metrics( + "STAGGERED TIMER REFRESH", + { + "due_timers": due_timers, + "future_timers": waiting_timers, + "timer_has_fired_calls": counter.calls, + "elapsed_seconds": f"{elapsed:.6f}", + }, + ) + _assert_optional_max("SPIFF_WAITING_STRESS_MAX_TIMER_CHECKS", counter.calls, self) + + +class _TimerCheckCounter: + def __init__(self): + self.calls = 0 + + +def _count_duration_timer_checks(): + counter = _TimerCheckCounter() + original = DurationTimerEventDefinition.has_fired + + def counted_has_fired(event_definition, task): + counter.calls += 1 + return original(event_definition, task) + + patcher = patch.object(DurationTimerEventDefinition, "has_fired", counted_has_fired) + + class TimerCheckContext: + def __enter__(self): + patcher.start() + return counter + + def __exit__(self, exc_type, exc_value, traceback): + patcher.stop() + + return TimerCheckContext() + + +def _tasks_with_bpmn_id_prefix(workflow, state, prefix): + return [ + task for task in workflow.get_tasks(state=state) + if task.task_spec.bpmn_id is not None and task.task_spec.bpmn_id.startswith(prefix) + ] + + +def _env_int(name, default): + value = os.environ.get(name) + return default if value is None else int(value) + + +def _assert_optional_max(env_name, actual, test_case): + expected = os.environ.get(env_name) + if expected is not None: + test_case.assertLessEqual(actual, int(expected)) + + +def _print_metrics(title, metrics): + print("\n" + "=" * 80) + print(f"WAITING TASK STRESS: {title}") + print("=" * 80) + for key, value in metrics.items(): + print(f" {key}: {value}") + print("=" * 80) diff --git a/tests/SpiffWorkflow/bpmn/WaitingTaskStressTest.py b/tests/SpiffWorkflow/bpmn/WaitingTaskStressTest.py new file mode 100644 index 00000000..6fe25ba4 --- /dev/null +++ b/tests/SpiffWorkflow/bpmn/WaitingTaskStressTest.py @@ -0,0 +1,110 @@ +import time +from unittest.mock import patch + +from SpiffWorkflow import TaskState +from SpiffWorkflow.bpmn.specs.event_definitions.timer import DurationTimerEventDefinition + +from .BpmnWorkflowTestCase import BpmnWorkflowTestCase +from .waiting_task_stress import StressBpmnKind, WaitingTaskStressConfig, load_stress_workflow + + +class WaitingTaskStressTest(BpmnWorkflowTestCase): + + def test_ready_hot_path_stress_fixture_keeps_many_dormant_waiting_timers(self): + config = WaitingTaskStressConfig(waiting_timers=6, ready_steps=5) + workflow = load_stress_workflow(self, StressBpmnKind.READY_HOT_PATH, config) + + workflow.do_engine_steps() + + waiting_timer_tasks = [ + task for task in workflow.get_tasks(state=TaskState.WAITING) + if task.task_spec.bpmn_id is not None and task.task_spec.bpmn_id.startswith("timer_wait_") + ] + completed_hot_steps = [ + task for task in workflow.get_tasks(state=TaskState.COMPLETED) + if task.task_spec.bpmn_id is not None and task.task_spec.bpmn_id.startswith("hot_step_") + ] + + self.assertEqual(config.waiting_timers, len(waiting_timer_tasks)) + self.assertEqual(config.ready_steps, len(completed_hot_steps)) + self.assertFalse(workflow.completed) + + def test_ready_hot_path_does_not_poll_dormant_timers_per_step(self): + config = WaitingTaskStressConfig(waiting_timers=8, ready_steps=6) + workflow = load_stress_workflow(self, StressBpmnKind.READY_HOT_PATH, config) + + with _count_duration_timer_checks() as counter: + workflow.do_engine_steps() + + self.assertLessEqual(counter.calls, config.waiting_timers) + + def test_refresh_waiting_tasks_is_noop_and_engine_steps_refresh_due_timers(self): + config = WaitingTaskStressConfig(waiting_timers=0, due_timers=1, due_duration="PT0.01S") + workflow = load_stress_workflow(self, StressBpmnKind.STAGGERED_TIMERS, config) + + workflow.do_engine_steps() + timer_task = workflow.get_tasks(state=TaskState.WAITING, spec_name="timer_wait_0")[0] + callbacks = [] + time.sleep(0.02) + + workflow.refresh_waiting_tasks(callbacks.append, callbacks.append) + + self.assertEqual(TaskState.WAITING, timer_task.state) + self.assertEqual([], callbacks) + + workflow.do_engine_steps() + + self.assertEqual(TaskState.COMPLETED, timer_task.state) + + def test_get_tasks_does_not_refresh_due_timers_by_inspection(self): + config = WaitingTaskStressConfig(waiting_timers=0, due_timers=1, due_duration="PT0.01S") + workflow = load_stress_workflow(self, StressBpmnKind.STAGGERED_TIMERS, config) + + workflow.do_engine_steps() + time.sleep(0.02) + + waiting_tasks = workflow.get_tasks(state=TaskState.WAITING, spec_name="timer_wait_0") + ready_tasks = workflow.get_tasks(state=TaskState.READY, spec_name="timer_wait_0") + + self.assertEqual(1, len(waiting_tasks)) + self.assertEqual(0, len(ready_tasks)) + + def test_due_timer_survives_save_restore_without_public_refresh(self): + config = WaitingTaskStressConfig(waiting_timers=0, due_timers=1, due_duration="PT0.01S") + self.workflow = load_stress_workflow(self, StressBpmnKind.STAGGERED_TIMERS, config) + + self.workflow.do_engine_steps() + timer_task = self.workflow.get_tasks(state=TaskState.WAITING, spec_name="timer_wait_0")[0] + self.save_restore() + time.sleep(0.02) + + self.workflow.do_engine_steps() + + timer_task = self.workflow.get_task_from_id(timer_task.id) + self.assertEqual(TaskState.COMPLETED, timer_task.state) + + +class _TimerCheckCounter: + def __init__(self): + self.calls = 0 + + +def _count_duration_timer_checks(): + counter = _TimerCheckCounter() + original = DurationTimerEventDefinition.has_fired + + def counted_has_fired(event_definition, task): + counter.calls += 1 + return original(event_definition, task) + + patcher = patch.object(DurationTimerEventDefinition, "has_fired", counted_has_fired) + + class TimerCheckContext: + def __enter__(self): + patcher.start() + return counter + + def __exit__(self, exc_type, exc_value, traceback): + patcher.stop() + + return TimerCheckContext() diff --git a/tests/SpiffWorkflow/bpmn/events/TimerCycleTest.py b/tests/SpiffWorkflow/bpmn/events/TimerCycleTest.py index 75a8236b..1d287753 100644 --- a/tests/SpiffWorkflow/bpmn/events/TimerCycleTest.py +++ b/tests/SpiffWorkflow/bpmn/events/TimerCycleTest.py @@ -48,7 +48,7 @@ def actual_test(self,save_restore = False): self.workflow.do_engine_steps() if save_restore: self.save_restore() - self.workflow.refresh_waiting_tasks() + self.workflow.do_engine_steps() events = self.workflow.waiting_events() refill = self.workflow.get_tasks(spec_name='Refill_Coffee') # Wait time is 0.1s, with a limit of 2 children, so by the 3rd iteration, the event should be complete diff --git a/tests/SpiffWorkflow/bpmn/events/TimerDurationTest.py b/tests/SpiffWorkflow/bpmn/events/TimerDurationTest.py index b29a787f..77886aba 100644 --- a/tests/SpiffWorkflow/bpmn/events/TimerDurationTest.py +++ b/tests/SpiffWorkflow/bpmn/events/TimerDurationTest.py @@ -35,7 +35,7 @@ def actual_test(self,save_restore = False): self.save_restore() self.workflow.script_engine = self.script_engine time.sleep(0.1) - self.workflow.refresh_waiting_tasks() + self.workflow.do_engine_steps() loopcount += 1 endtime = datetime.now() duration = endtime - starttime diff --git a/tests/SpiffWorkflow/bpmn/events/TimerIntermediateTest.py b/tests/SpiffWorkflow/bpmn/events/TimerIntermediateTest.py index 91326e0b..3e78088d 100644 --- a/tests/SpiffWorkflow/bpmn/events/TimerIntermediateTest.py +++ b/tests/SpiffWorkflow/bpmn/events/TimerIntermediateTest.py @@ -31,9 +31,6 @@ def testRunThroughHappy(self): time.sleep(0.02) self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) - self.workflow.refresh_waiting_tasks() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.WAITING))) - self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) - self.workflow.do_engine_steps() + self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) diff --git a/tests/SpiffWorkflow/bpmn/waiting_task_stress.py b/tests/SpiffWorkflow/bpmn/waiting_task_stress.py new file mode 100644 index 00000000..a5379d85 --- /dev/null +++ b/tests/SpiffWorkflow/bpmn/waiting_task_stress.py @@ -0,0 +1,152 @@ +import os +from dataclasses import dataclass +from enum import Enum +from uuid import uuid4 + +from SpiffWorkflow.bpmn.workflow import BpmnWorkflow + + +class StressBpmnKind(Enum): + READY_HOT_PATH = "ready_hot_path" + STAGGERED_TIMERS = "staggered_timers" + + +@dataclass(frozen=True) +class WaitingTaskStressConfig: + waiting_timers: int = 100 + ready_steps: int = 100 + due_timers: int = 0 + future_duration: str = "PT24H" + due_duration: str = "PT0S" + + +def build_stress_bpmn(kind, config): + if kind == StressBpmnKind.READY_HOT_PATH: + return _build_ready_hot_path_bpmn(config) + if kind == StressBpmnKind.STAGGERED_TIMERS: + return _build_staggered_timers_bpmn(config) + raise ValueError(f"Unsupported stress BPMN kind: {kind}") + + +def load_stress_workflow(test_case, kind, config): + filename = write_stress_bpmn(test_case, kind, config) + try: + spec, subprocesses = test_case.load_workflow_spec(filename, "waiting_task_stress", validate=False) + return BpmnWorkflow(spec, subprocesses) + finally: + path = _data_path(filename) + if os.path.exists(path): + os.unlink(path) + + +def write_stress_bpmn(test_case, kind, config): + filename = f"_generated_waiting_task_stress_{kind.value}_{uuid4().hex}.bpmn" + path = _data_path(filename) + with open(path, "w") as bpmn_file: + bpmn_file.write(build_stress_bpmn(kind, config)) + return filename + + +def _data_path(filename): + return os.path.join(os.path.dirname(__file__), "data", filename) + + +def _build_ready_hot_path_bpmn(config): + timer_branches = [ + _timer_branch(idx, config.future_duration) + for idx in range(config.waiting_timers) + ] + timer_flows = [ + f'flow_split_timer_{idx}' + for idx in range(config.waiting_timers) + ] + return _definitions( + "\n".join([ + _start_and_split(timer_flows + ["flow_split_hot_0"]), + "\n".join(timer_branches), + _hot_path(config.ready_steps), + ]) + ) + + +def _build_staggered_timers_bpmn(config): + timer_count = config.waiting_timers + config.due_timers + timer_flows = [ + f'flow_split_timer_{idx}' + for idx in range(timer_count) + ] + branches = [] + for idx in range(timer_count): + duration = config.due_duration if idx < config.due_timers else config.future_duration + branches.append(_timer_branch(idx, duration)) + return _definitions("\n".join([ + _start_and_split(timer_flows), + "\n".join(branches), + ])) + + +def _definitions(process_body): + return f""" + + +{_indent(process_body, 4)} + + +""" + + +def _start_and_split(split_outgoing): + outgoing = "\n".join(split_outgoing) + return f""" + flow_start_split + + + flow_start_split +{_indent(outgoing, 2)} + +""" + + +def _timer_branch(idx, duration): + return f""" + flow_split_timer_{idx} + flow_timer_{idx}_end + + "{duration}" + + + + flow_timer_{idx}_end + + +""" + + +def _hot_path(ready_steps): + if ready_steps < 1: + raise ValueError("ready_steps must be at least 1") + + tasks = [] + flows = [''] + for idx in range(ready_steps): + incoming = "flow_split_hot_0" if idx == 0 else f"flow_hot_{idx - 1}_{idx}" + outgoing = "flow_hot_last_end" if idx == ready_steps - 1 else f"flow_hot_{idx}_{idx + 1}" + tasks.append(f""" + {incoming} + {outgoing} + hot_path_steps = hot_path_steps + 1 if 'hot_path_steps' in locals() else 1 +""") + if idx < ready_steps - 1: + flows.append( + f'' + ) + flows.append('flow_hot_last_end') + flows.append( + f'' + ) + return "\n".join(tasks + flows) + + +def _indent(text, spaces): + prefix = " " * spaces + return "\n".join(f"{prefix}{line}" if line else line for line in text.splitlines())