Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ tests-ind:
tests-timing:
@make tests-ind 2>&1 | ./scripts/test_times.py

.PHONY : waiting-task-stress
waiting-task-stress:
$(RUN) python -m unittest -v tests.SpiffWorkflow.bpmn.WaitingTaskStressBenchmark


wheel: clean
$(RUN) python -m build --sdist --wheel --outdir dist/
1 change: 1 addition & 0 deletions SpiffWorkflow/bpmn/serializer/default/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ def from_dict(self, dct):
# Handle the remaining top workflow attributes
self.subprocesses_from_dict(dct['subprocesses'], workflow)
workflow.bpmn_events = self.registry.restore(dct.pop('bpmn_events', []))
workflow._rebuild_waiting_task_index()

return workflow

Expand Down
7 changes: 6 additions & 1 deletion SpiffWorkflow/bpmn/util/subworkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ def data_objects(self):
def get_tasks_iterator(self, first_task=None, **kwargs):
return BpmnTaskIterator(first_task or self.task_tree, **kwargs)

def update_waiting_tasks(self):
self.top_workflow._refresh_internal_waiting_tasks()

def _task_state_changed_notify(self, task, old_state, new_state):
self.top_workflow._waiting_task_state_changed(task, old_state, new_state)


class BpmnSubWorkflow(BpmnBaseWorkflow):

Expand Down Expand Up @@ -68,4 +74,3 @@ def collect_log_extras(self, dct=None):
dct = super().collect_log_extras()
dct.update({'parent_task_id': self.parent_task_id})
return dct

166 changes: 150 additions & 16 deletions SpiffWorkflow/bpmn/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

import heapq
from datetime import datetime, timezone

from SpiffWorkflow.task import Task
from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.exceptions import WorkflowException

from SpiffWorkflow.bpmn.specs.mixins.events.event_types import CatchingEvent
from SpiffWorkflow.bpmn.specs.mixins.events.start_event import StartEvent
from SpiffWorkflow.bpmn.specs.mixins.subworkflow_task import CallActivity
from SpiffWorkflow.bpmn.specs.event_definitions.multiple import MultipleEventDefinition
from SpiffWorkflow.bpmn.specs.event_definitions.timer import TimerEventDefinition
from SpiffWorkflow.bpmn.specs.event_definitions.item_aware_event import CodeEventDefinition

from SpiffWorkflow.bpmn.specs.control import BoundaryEventSplit
Expand All @@ -33,6 +38,98 @@
from .script_engine.python_engine import PythonScriptEngine


class _WaitingTaskIndex:

def __init__(self):
self.waiting_tasks = {}
self.timer_tasks = {}
self.timer_due_at = {}
self.timer_heap = []
self._sequence = 0

def task_state_changed(self, task, old_state, new_state):
if old_state == TaskState.WAITING:
self._remove(task)
if new_state == TaskState.WAITING:
self._add(task)

def refresh_internal_tasks(self, refresh_task):
for task in list(self.waiting_tasks.values()):
if task.id not in self.timer_tasks and task.state == TaskState.WAITING:
refresh_task(task)
self.refresh_due_timers(refresh_task)

def refresh_due_timers(self, refresh_task):
self._schedule_missing_timer_due_times(refresh_task)
now = datetime.now(timezone.utc).timestamp()
while self.timer_heap and self.timer_heap[0][0] <= now:
due_at, _sequence, task_id = heapq.heappop(self.timer_heap)
if self.timer_due_at.get(task_id) != due_at:
continue
task = self.timer_tasks.get(task_id)
if task is None or task.state != TaskState.WAITING:
continue
refresh_task(task)

def refresh_tasks(self, tasks, refresh_task):
for task in tasks:
refresh_task(task)

def reschedule_timer(self, task):
if task.id in self.timer_tasks:
self._schedule_timer(task)

def _add(self, task):
self.waiting_tasks[task.id] = task
if self._is_timer_task(task):
self.timer_tasks[task.id] = task
self._schedule_timer(task)

def _remove(self, task):
self.waiting_tasks.pop(task.id, None)
self.timer_tasks.pop(task.id, None)
self.timer_due_at.pop(task.id, None)

def _schedule_missing_timer_due_times(self, refresh_task):
for task in list(self.timer_tasks.values()):
if task.state != TaskState.WAITING:
continue
if task.id not in self.timer_due_at:
refresh_task(task)

def _schedule_timer(self, task):
due_at = self._get_timer_due_at(task)
if due_at is None:
self.timer_due_at.pop(task.id, None)
return
self.timer_due_at[task.id] = due_at
self._sequence += 1
heapq.heappush(self.timer_heap, (due_at, self._sequence, task.id))

def _is_timer_task(self, task):
return isinstance(task.task_spec, CatchingEvent) and self._has_timer_definition(task.task_spec.event_definition)

def _has_timer_definition(self, event_definition):
if isinstance(event_definition, TimerEventDefinition):
return True
if isinstance(event_definition, MultipleEventDefinition):
return any(self._has_timer_definition(definition) for definition in event_definition.event_definitions)
return False

def _get_timer_due_at(self, task):
event_value = task._get_internal_data('event_value')
if event_value is None:
return None
if isinstance(event_value, dict):
if event_value.get('cycles') == 0:
return 0
next_event = event_value.get('next')
if next_event is None:
return None
return TimerEventDefinition.get_datetime(next_event).timestamp()
return TimerEventDefinition.get_datetime(event_value).timestamp()


class BpmnWorkflow(BpmnBaseWorkflow):
"""
The engine that executes a BPMN workflow. This specialises the standard
Expand All @@ -51,6 +148,8 @@ def __init__(self, spec, subprocess_specs=None, script_engine=None, **kwargs):
self.subprocesses = {}
self.bpmn_events = []
self.correlations = {}
self._waiting_task_index = _WaitingTaskIndex()
self._refreshing_waiting_tasks = False
super(BpmnWorkflow, self).__init__(spec, **kwargs)

for obj in self.spec.data_objects:
Expand Down Expand Up @@ -129,7 +228,7 @@ def catch(self, event):
for task in tasks:
task.task_spec.catch(task, event)
if len(tasks) > 0:
self.refresh_waiting_tasks()
self._refresh_caught_tasks(tasks)

def send_event(self, event):
"""Allows this workflow to catch an externally generated event."""
Expand All @@ -142,7 +241,7 @@ def send_event(self, event):
raise WorkflowException(f"This process is not waiting for {event.event_definition.name}")
for task in tasks:
task.task_spec.catch(task, event)
self.refresh_waiting_tasks()
self._refresh_caught_tasks(tasks)

def get_events(self):
"""Returns the list of events that cannot be handled from within this workflow."""
Expand All @@ -164,8 +263,10 @@ def do_engine_steps(self, will_complete_task=None, did_complete_task=None):
:param will_complete_task: Callback that will be called prior to completing a task
:param did_complete_task: Callback that will be called after completing a task
"""
self._refresh_due_waiting_tasks()
count = self._do_engine_steps(will_complete_task, did_complete_task)
while count > 0:
self._refresh_due_waiting_tasks()
count = self._do_engine_steps(will_complete_task, did_complete_task)

def _do_engine_steps(self, will_complete_task=None, did_complete_task=None):
Expand Down Expand Up @@ -197,25 +298,58 @@ def update_workflow(wf):

def refresh_waiting_tasks(self, will_refresh_task=None, did_refresh_task=None):
"""
Refresh the state of all WAITING tasks. This will, for example, update
Catching Timer Events whose waiting time has passed.
Compatibility no-op.

BPMN workflows now refresh WAITING task internals through engine steps,
targeted event catches, and task completion notifications.

:param will_refresh_task: Callback that will be called prior to refreshing a task
:param did_refresh_task: Callback that will be called after refreshing a task
"""
def update_task(task):
if will_refresh_task is not None:
will_refresh_task(task)
pass

def _waiting_task_state_changed(self, task, old_state, new_state):
self._waiting_task_index.task_state_changed(task, old_state, new_state)

def _rebuild_waiting_task_index(self):
self._waiting_task_index = _WaitingTaskIndex()
workflows = [self] + list(self.subprocesses.values())
for workflow in workflows:
for task in workflow.tasks.values():
if task.state == TaskState.WAITING:
self._waiting_task_index.task_state_changed(task, None, TaskState.WAITING)

def _refresh_internal_waiting_tasks(self):
if self._refreshing_waiting_tasks:
return
self._refreshing_waiting_tasks = True
try:
self._waiting_task_index.refresh_internal_tasks(self._refresh_waiting_task)
finally:
self._refreshing_waiting_tasks = False

def _refresh_due_waiting_tasks(self):
if self._refreshing_waiting_tasks:
return
self._refreshing_waiting_tasks = True
try:
self._waiting_task_index.refresh_due_timers(self._refresh_waiting_task)
finally:
self._refreshing_waiting_tasks = False

def _refresh_caught_tasks(self, tasks):
if self._refreshing_waiting_tasks:
return
self._refreshing_waiting_tasks = True
try:
self._waiting_task_index.refresh_tasks(tasks, self._refresh_waiting_task)
finally:
self._refreshing_waiting_tasks = False

def _refresh_waiting_task(self, task):
if task.state == TaskState.WAITING:
task.task_spec._update(task)
if did_refresh_task is not None:
did_refresh_task(task)

for subprocess in sorted(self.get_active_subprocesses(), key=lambda v: v.depth, reverse=True):
for task in subprocess.get_tasks_iterator(skip_subprocesses=True, state=TaskState.WAITING):
update_task(task)

for task in self.get_tasks_iterator(skip_subprocesses=True, state=TaskState.WAITING):
update_task(task)
self._waiting_task_index.reschedule_timer(task)

def get_task_from_id(self, task_id):
if task_id not in self.tasks:
Expand Down
3 changes: 3 additions & 0 deletions SpiffWorkflow/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,12 @@ def _set_state(self, value):
"""Force set the state on a task"""

if value != self.state:
old_state = self._state
elapsed = time.time() - self.last_state_change
self.last_state_change = time.time()
self._state = value
if hasattr(self.workflow, '_task_state_changed_notify'):
self.workflow._task_state_changed_notify(self, old_state, value)
logger.info(
f'State changed to {TaskState.get_name(value)}',
extra=self.collect_log_extras({'elapsed': elapsed})
Expand Down
2 changes: 2 additions & 0 deletions SpiffWorkflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ def _remove_task(self, task_id):
task = self.tasks[task_id]
for child in task.children:
self._remove_task(child.id)
if hasattr(self, '_task_state_changed_notify'):
self._task_state_changed_notify(task, task.state, None)
task.parent._children.remove(task.id)
self.tasks.pop(task_id)

Expand Down
Loading
Loading