diff --git a/synchros2/synchros2/executors.py b/synchros2/synchros2/executors.py index bf79212..2eb02a3 100644 --- a/synchros2/synchros2/executors.py +++ b/synchros2/synchros2/executors.py @@ -670,6 +670,10 @@ def bind(self, callback_group: rclpy.callback_groups.CallbackGroup, thread_pool: self._logger.debug(f"Binding {callback_group_name} to thread pool #{thread_pool_index}...") self._callback_group_affinity[callback_group] = thread_pool + def _spin_once_impl(self, *args: typing.Any, **kwargs: typing.Any) -> None: + """Spin the executor once (for Jazzy and beyond).""" + self._do_spin_once(*args, **kwargs) + def _do_spin_once(self, *args: typing.Any, **kwargs: typing.Any) -> None: with self._spin_lock: try: @@ -787,9 +791,15 @@ def shutdown(self, timeout_sec: typing.Optional[float] = None) -> bool: with self._spin_lock: # rclpy.executors.Executor base implementation leaves tasks # unawaited upon shutdown. Do the housekeepng. - known_tasks = [ - AutoScalingMultiThreadedExecutor.Task(task, entity, node) for task, entity, node in self._tasks - ] + list(self._work_in_progress) + if hasattr(self, "_tasks"): # ROS 2 Humble + known_tasks = [ + AutoScalingMultiThreadedExecutor.Task(task, entity, node) for task, entity, node in self._tasks + ] + list(self._work_in_progress) + else: # ROS 2 Jazzy and beyond + known_tasks = [ + AutoScalingMultiThreadedExecutor.Task(task, task_data.source_entity, task_data.source_node) + for task, task_data in self._pending_tasks.items() + ] + list(self._work_in_progress) for task in known_tasks: task.cancel() return done