Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "ros2-robot-interface"
version = "0.2.1"
version = "0.3.1"
description = "Standalone ROS 2 robot interface for communicating with robots through topics"
authors = [
{name = "Huang Zhenbiao", email = "biao876990970@hotmail.com"}
Expand Down
2 changes: 1 addition & 1 deletion ros2_robot_interface/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class ROS2RobotInterfaceConfig:
# ============================================================================
# 状态机切换(可选,手动配置)
# ============================================================================
auto_switch_fsm_before_control: bool = True # 【可选】是否在发送部分控制指令前自动发送 FSM 切换命令,默认关闭
fsm_state_switch_settle_time: float = 0.3 # 【可选】发送 FSM 状态切换命令后的等待时间(单位:秒),用于确保对端完成状态切换

# ============================================================================
Expand Down Expand Up @@ -228,4 +229,3 @@ def default_bimanual(
**kwargs,
)


63 changes: 34 additions & 29 deletions ros2_robot_interface/handler/arm_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from rclpy.subscription import Subscription
from std_msgs.msg import Float64MultiArray

from ..constants import FSM_MOVEJ
from ..constants import FSM_MOVEJ, FSM_OCS2
from ..utils.exceptions import ROS2NotConnectedError

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -170,6 +170,15 @@ def send_target(self, pose: Pose) -> None:
"""
if self.target_pub is None:
raise ROS2NotConnectedError(f"{self.label} target publisher not initialized")

if self.config.auto_switch_fsm_before_control and self.fsm_command_callback is not None:
try:
current = self.query_fsm_command()
if current != FSM_OCS2:
self.fsm_command_callback(FSM_OCS2)
logger.debug(f"{self.label}: auto-switched FSM to OCS2 for arm pose control")
except Exception as e:
logger.warning(f"{self.label}: failed to auto-switch FSM to OCS2: {e}")

# 清除旧的 current target,避免在收到新目标前误判为已到达
self.latest_target_pose = None
Expand Down Expand Up @@ -208,6 +217,15 @@ def send_target_stamped(self, frame_id: str | Pose | None = None, pose: Optional
raise ValueError(
f"{self.label}: frame_id is required because no default frame_id is available yet"
)

if self.config.auto_switch_fsm_before_control and self.fsm_command_callback is not None:
try:
current = self.query_fsm_command()
if current != FSM_OCS2:
self.fsm_command_callback(FSM_OCS2)
logger.debug(f"{self.label}: auto-switched FSM to OCS2 for arm pose control")
except Exception as e:
logger.warning(f"{self.label}: failed to auto-switch FSM to OCS2: {e}")

# 清除旧的 current target,避免在收到新目标前误判为已到达
self.latest_target_pose = None
Expand Down Expand Up @@ -272,34 +290,22 @@ def send_joint_positions(self, positions: List[float]) -> None:
f"Joint controller topic not found."
)

# 如果没有 FSM 回调,抛出异常
if self.fsm_command_callback is None:
raise ValueError(
f"{self.label}: FSM command callback is required for send_joint_positions(). "
f"Please initialize ArmHandler with fsm_command_callback."
)

# 发送 FSM 命令切换到 MOVEJ 状态(若已查询到当前为 MOVEJ 则跳过,避免重复切换与等待)
skip_fsm = False
current = self.query_fsm_command()
if current is not None and current == FSM_MOVEJ:
skip_fsm = True
print(
f"{self.label}: FSM already MOVEJ (command={current}), skipping HOLD/MOVEJ switch",
flush=True,
)
if not skip_fsm:
try:
self.fsm_command_callback(2)
self.fsm_command_callback(4)
print(
f"{self.label}: Automatically switched to MOVEJ state for arm joint control",
flush=True,
if self.config.auto_switch_fsm_before_control:
# 如果启用自动切换但没有 FSM 回调,抛出异常
if self.fsm_command_callback is None:
raise ValueError(
f"{self.label}: FSM command callback is required for send_joint_positions() "
f"when auto_switch_fsm_before_control=True."
)
# 注意:状态切换是异步的,如果需要确保状态切换完成后再发送关节位置,
# 可以在调用此函数后自行等待,或确保当前已在 MOVEJ 状态
except Exception as e:
print(f"{self.label}: Failed to switch to MOVEJ state: {e}", flush=True)

# 自动切到 MOVEJ(若已是 MOVEJ 则跳过)
current = self.query_fsm_command()
if current != FSM_MOVEJ:
try:
self.fsm_command_callback(FSM_MOVEJ)
logger.debug(f"{self.label}: auto-switched FSM to MOVEJ for arm joint control")
except Exception as e:
logger.warning(f"{self.label}: failed to auto-switch FSM to MOVEJ: {e}")

msg = Float64MultiArray()
msg.data = positions
Expand Down Expand Up @@ -387,4 +393,3 @@ def cleanup(self) -> None:
if self.joint_controller_pub:
self.joint_controller_pub.destroy()
self.joint_controller_pub = None

Loading