Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
<exec_depend>rclpy</exec_depend>
<exec_depend>tf2_ros</exec_depend>
<exec_depend>rmcs_local_map</exec_depend>

<exec_depend>rmcs-relocation</exec_depend>

<export>
<build_type>ament_cmake</build_type>
</export>
Expand Down
53 changes: 52 additions & 1 deletion src/cxx/component.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@
#endif

#include "cxx/context.hh"
#include "cxx/util/localization/engine.hh"
#include "cxx/util/navigation/navigation.hh"
#include "cxx/util/node_mixin.hh"

#include <atomic>
#include <filesystem>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

#include <Eigen/Geometry>
#include <ament_index_cpp/get_package_share_directory.hpp>
Expand Down Expand Up @@ -43,6 +49,8 @@ class Navigation

details::Context context;
details::Navigation navigation;
std::unique_ptr<Localization> localization;
bool relocalization_enabled = true;

struct Command {
OutputInterface<Eigen::Vector2d> chassis_speed;
Expand Down Expand Up @@ -102,6 +110,34 @@ class Navigation
api.set_function("update_chassis_vel", [this](double x, double y) {
*command.chassis_speed = Eigen::Vector2d{x, y};
});
// 三个 mode 共享同一个 disabled-guard + 委托模板。lua 端别名(initial / local_ / wide)
const auto bind_relocalize = [this, &api](std::string_view name, RelocalizeMode mode) {
api.set_function(
std::string{name}, [this, name, mode](double x, double y, double yaw) {
if (!relocalization_enabled || !localization) {
warn("{} ignored: disabled", name);
return false;
}
return localization->relocalize(mode, x, y, yaw);
});
};
bind_relocalize("relocalize_initial", RelocalizeMode::Initial);
bind_relocalize("relocalize_local", RelocalizeMode::Local);
bind_relocalize("relocalize_wide", RelocalizeMode::Wide);

// status 转 lua table:disabled/enabled。
api.set_function("relocalize_status", [this]() {
const auto status = (relocalization_enabled && localization)
? localization->relocalize_status()
: RelocalizeStatus{.message = "disabled"};
return lua->create_table_with(
"state", static_cast<int>(status.state), "success", status.success,
"message", status.message, "fitness_score", status.fitness_score, "confidence",
status.confidence, "estimated_x", status.estimated_x, "estimated_y",
status.estimated_y, "estimated_z", status.estimated_z, "estimated_qx",
status.estimated_qx, "estimated_qy", status.estimated_qy, "estimated_qz",
status.estimated_qz, "estimated_qw", status.estimated_qw);
});
}

auto make_option_injection() {
Expand Down Expand Up @@ -232,6 +268,21 @@ class Navigation
, navigation{*this} {

mock_context = param<bool>("mock_context");
relocalization_enabled =
has_parameter("enable_relocalization")
? get_parameter_or<bool>("enable_relocalization", true)
: true;
if (relocalization_enabled) {
auto config = Localization::Config{ .rclcpp = *this };
if (has_parameter("localization.service_name"))
config.service_name = get_parameter("localization.service_name").as_string();
if (has_parameter("localization.request_timeout_sec"))
config.request_timeout_sec =
get_parameter("localization.request_timeout_sec").as_double();
localization = std::make_unique<Localization>(std::move(config));
} else {
warn("relocalization is disabled by parameter enable_relocalization=false");
}

context.init(io_mutex, mock_context);
command.init(*this);
Expand Down
259 changes: 195 additions & 64 deletions src/cxx/util/localization/engine.cc
Original file line number Diff line number Diff line change
@@ -1,74 +1,207 @@
#include "cxx/util/localization/engine.hh"
#include "cxx/util/node_mixin.hh"

#include <pcl/io/pcd_io.h>
#include <pcl/point_cloud.h>
#include <pcl/point_types.h>
#include <pcl/registration/ndt.h>

#include <pcl_conversions/pcl_conversions.h>
#include <rclcpp/subscription.hpp>
#include <sensor_msgs/msg/point_cloud2.hpp>
#include <algorithm>
#include <chrono>
#include <cstdint>
#include <memory>
#include <mutex>
#include <optional>
#include <utility>

#include <geometry_msgs/msg/pose.hpp>
#include <rclcpp/client.hpp>
#include <rclcpp/logging.hpp>
#include <rclcpp/timer.hpp>
#include <rmcs_msgs/srv/relocalize.hpp>
#include <tf2/LinearMath/Quaternion.hpp>

namespace rmcs::navigation {

struct Localization::Impl {
Config config;
LoggerWrap<rclcpp::Node> logging;

struct Context {
using Point = pcl::PointXYZ;
using Cloud = pcl::PointCloud<Point>;

std::shared_ptr<rclcpp::Subscription<sensor_msgs::msg::PointCloud2>> subscription;
std::atomic<bool> stop_collecting = false;

std::shared_ptr<Cloud> map;
std::shared_ptr<Cloud> collected = std::make_shared<Cloud>();

explicit Context(Config& config) {
const auto& filename = config.map_filename;
if (pcl::io::loadPCDFile<Point>(filename, *map) != 0)
throw std::runtime_error{"Couldn't read " + filename};

auto& rclcpp = config.rclcpp;
subscription = rclcpp.create_subscription<sensor_msgs::msg::PointCloud2>(
config.topic_registered, 10,
[this](const std::unique_ptr<sensor_msgs::msg::PointCloud2>& msg) {
if (!stop_collecting) {
auto received = Cloud{};
pcl::fromROSMsg(*msg, received);

*collected += received;
}
});
}
} context;
namespace {

using Relocalize = rmcs_msgs::srv::Relocalize;
using RelocalizeClient = rclcpp::Client<Relocalize>;

auto yaw_to_pose(double x, double y, double yaw) -> geometry_msgs::msg::Pose {
auto q = tf2::Quaternion{};
q.setRPY(0.0, 0.0, yaw);
q.normalize();
auto pose = geometry_msgs::msg::Pose{};
pose.position.x = x;
pose.position.y = y;
pose.position.z = 0.0;
pose.orientation.x = q.x();
pose.orientation.y = q.y();
pose.orientation.z = q.z();
pose.orientation.w = q.w();
return pose;
}

auto failed_status(std::string message) -> RelocalizeStatus {
return RelocalizeStatus{
.state = RelocalizeState::FAILED,
.success = false,
.message = std::move(message),
};
}

auto status_from_response(const Relocalize::Response& response, bool success) -> RelocalizeStatus {
const auto& pose = response.estimated_world_base;
return RelocalizeStatus{
.state = success ? RelocalizeState::SUCCEEDED : RelocalizeState::FAILED, .success = success,
.message = response.message, .fitness_score = response.fitness_score, .confidence = response.confidence,
.estimated_x = pose.position.x, .estimated_y = pose.position.y, .estimated_z = pose.position.z,
.estimated_qx = pose.orientation.x, .estimated_qy = pose.orientation.y, .estimated_qz = pose.orientation.z,
.estimated_qw = pose.orientation.w,
};
}

pcl::NormalDistributionsTransform<Context::Point, Context::Point> engine;
constexpr auto mode_to_msg(RelocalizeMode mode) -> std::uint8_t {
switch (mode) {
case RelocalizeMode::Initial: return Relocalize::Request::MODE_INITIAL;
case RelocalizeMode::Local: return Relocalize::Request::MODE_LOCAL;
case RelocalizeMode::Wide: return Relocalize::Request::MODE_WIDE;
}
return Relocalize::Request::MODE_INITIAL;
}

explicit Impl(Localization::Config config)
: config{std::move(config)}
, logging{config.rclcpp}
, context{config} {
} // namespace

/// 一次重定位请求的全生命周期。lambda 通过 shared_ptr 持有。
struct Session {
mutable std::mutex mutex;
std::shared_ptr<RelocalizeClient> client;
rclcpp::Logger logger;
std::string service_name;

std::optional<std::int64_t> pending_id;
RelocalizeStatus last_status{};
rclcpp::TimerBase::SharedPtr timeout_timer;

Session(std::shared_ptr<RelocalizeClient> c, rclcpp::Logger lg, std::string name)
: client{std::move(c)}, logger{std::move(lg)}, service_name{std::move(name)} {}

/// 抢占一个 in-flight 槽位。若已在飞行中返回 false。
auto begin() -> bool {
auto lock = std::scoped_lock{mutex};
if (last_status.state == RelocalizeState::IN_FLIGHT)
return false;
pending_id.reset();
last_status =
RelocalizeStatus{.state = RelocalizeState::IN_FLIGHT, .message = "in_flight"};
return true;
}

engine.setTransformationEpsilon(config.ndt_result_epsilon); // 收敛判定阈值
engine.setStepSize(config.ndt_step_size); // More-Thuente 线搜索最大步长
engine.setResolution(config.ndt_resolution); // NDT 网格分辨率
engine.setMaximumIterations(config.ndt_max_iterations); // 最大迭代次数
auto track(std::int64_t id) -> void {
auto lock = std::scoped_lock{mutex};
pending_id = id;
}

auto start_collecting(std::chrono::seconds seconds) -> std::expected<void, std::string> {
std::ignore = this;
std::ignore = seconds;
return {};
/// 写入终态。expected_id != nullopt 时仅当与当前 pending 一致才接受(过滤迟到响应)。
auto end(RelocalizeStatus status, std::optional<std::int64_t> expected_id = std::nullopt)
-> bool {
auto lock = std::scoped_lock{mutex};
if (expected_id && pending_id != *expected_id)
return false;
clear_locked();
last_status = std::move(status);
return true;
}

auto start_localizing(const Eigen::Isometry3d& initial_solution)
-> std::future<std::expected<Eigen::Isometry3d, std::string>> {
std::ignore = this;
std::ignore = initial_solution;
return {};
auto snapshot() const -> RelocalizeStatus {
auto lock = std::scoped_lock{mutex};
return last_status;
}

/// 必须在持锁状态下调用:取消定时器、从 client 移除 pending 条目。
auto clear_locked() -> void {
if (timeout_timer) {
timeout_timer->cancel();
timeout_timer.reset();
}
if (pending_id) {
client->remove_pending_request(*pending_id);
pending_id.reset();
}
}
};

struct Localization::Impl : NodeMixin {
Config config;
rclcpp::Node& node;
std::shared_ptr<Session> session;

explicit Impl(Localization::Config cfg)
: config{std::move(cfg)}
, node{config.rclcpp}
, session{std::make_shared<Session>(
node.create_client<Relocalize>(config.service_name), node.get_logger(),
config.service_name)} {}

~Impl() {
auto lock = std::scoped_lock{session->mutex};
session->clear_locked();
}

auto get_logger() const { return node.get_logger(); }

auto arm_timeout(const std::shared_ptr<Session>& s) -> void {
const auto timeout = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::duration<double>{std::max(0.1, config.request_timeout_sec)});
auto timer = node.create_wall_timer(timeout, [s]() {
if (s->end(failed_status("request timeout: " + s->service_name)))
RCLCPP_WARN(s->logger, "relocalize request timeout: %s", s->service_name.c_str());
});
auto lock = std::scoped_lock{s->mutex};
if (s->timeout_timer)
s->timeout_timer->cancel();
s->timeout_timer = std::move(timer);
}

auto send(RelocalizeMode mode, double x, double y, double yaw) -> bool {
if (!session->begin()) {
warn("relocalize skipped: previous request is still in flight");
return false;
}

if (!session->client->service_is_ready()
&& !session->client->wait_for_service(std::chrono::seconds(0))) {
warn("relocalize service unavailable: {}", config.service_name);
session->end(failed_status("service unavailable: " + config.service_name));
return false;
}

auto request = std::make_shared<Relocalize::Request>();
request->mode = mode_to_msg(mode);
request->initial_guess_world_base = yaw_to_pose(x, y, yaw);

arm_timeout(session);

// executor 异步触发 callback。
auto pending_id = std::make_shared<std::int64_t>(-1);
auto s = session;
auto result = session->client->async_send_request(
std::move(request), [s, pending_id](RelocalizeClient::SharedFuture future) {
auto response = future.get();
if (!response) {
RCLCPP_WARN(s->logger, "relocalize response is null");
s->end(failed_status("response is null"), *pending_id);
return;
}
if (!response->success) {
RCLCPP_WARN(s->logger, "relocalize failed: %s", response->message.c_str());
s->end(status_from_response(*response, false), *pending_id);
return;
}
RCLCPP_INFO(
s->logger, "relocalize success: score=%.4f, confidence=%.3f",
response->fitness_score, response->confidence);
s->end(status_from_response(*response, true), *pending_id);
});
*pending_id = result.request_id;
session->track(result.request_id);
Comment on lines +149 to +203
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

🧩 Analysis chain

🏁 Script executed:

find . -type f -name "engine.cc" | grep -E "localization|util"

Repository: Alliance-Algorithm/rmcs-navigation

Length of output: 116


🏁 Script executed:

head -300 src/cxx/util/localization/engine.cc | tail -200

Repository: Alliance-Algorithm/rmcs-navigation

Length of output: 7393


🏁 Script executed:

# Get the Session class definition and end() method
rg -A 15 "class Session" src/cxx/util/localization/engine.cc

Repository: Alliance-Algorithm/rmcs-navigation

Length of output: 60


🏁 Script executed:

# Check the end() method signature and implementation
rg -B 2 -A 15 "Session::end\(" src/cxx/util/localization/engine.cc

Repository: Alliance-Algorithm/rmcs-navigation

Length of output: 60


🏁 Script executed:

# Verify the async callback and race condition context
sed -n '149,203p' src/cxx/util/localization/engine.cc

Repository: Alliance-Algorithm/rmcs-navigation

Length of output: 2718


🏁 Script executed:

# Check git log for recent changes to engine.cc
git log --oneline -20 src/cxx/util/localization/engine.cc

Repository: Alliance-Algorithm/rmcs-navigation

Length of output: 97


🏁 Script executed:

# Check if there are any other versions or branches with fixes
git show HEAD:src/cxx/util/localization/engine.cc | sed -n '149,203p'

Repository: Alliance-Algorithm/rmcs-navigation

Length of output: 2718


响应回调和超时回调需要统一的请求令牌验证机制。

存在严重的请求生命周期竞态:响应回调捕获的 pending_id 初始化为 -1,仅在 async_send_request(...) 返回后才被赋值为真实的 result.request_id。若响应回调在赋值前执行,会传入 expected_id = -1Session::end(),而此时 Session::pending_idstd::nullopt,导致条件 if (expected_id && pending_id != *expected_id) 判断为真,end() 返回 false,有效响应被静默丢弃。同时,超时回调调用 s->end(...) 时未传递 expected_id,使其总能成功,可在响应完成后覆盖有效状态。

需要在 begin() 时生成一个会话令牌,令响应和超时两条路径都必须验证该令牌才能更新终态。

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/cxx/util/localization/engine.cc` around lines 149 - 203, The code has a
race between the async response callback and timeout because pending_id is set
only after async_send_request returns; fix by giving each request a dedicated
session token when session->begin() is called (e.g. generate a uint64_t
request_token inside begin and store it on Session as current_request_token
and/or an optional), pass that token into arm_timeout (capture it in the timer)
and into the async_send_request lambda (capture by value) instead of using the
shared pending_id pointer, and change Session::end to accept and check an
expected_token (compare against Session::current_request_token and only proceed
if equal), so both the response handler and the timeout handler validate the
same request token before mutating session state; update send to set the token
before calling async_send_request and session->track(result.request_id) as
before.

return true;
}
};

Expand All @@ -77,14 +210,12 @@ Localization::Localization(Config config)

Localization::~Localization() noexcept = default;

auto Localization::start_collecting(std::chrono::seconds seconds)
-> std::expected<void, std::string> {
return pimpl->start_collecting(seconds);
auto Localization::relocalize(RelocalizeMode mode, double x, double y, double yaw) -> bool {
return pimpl->send(mode, x, y, yaw);
}

auto Localization::start_localizing(const Eigen::Isometry3d& initial_solution)
-> std::future<std::expected<Eigen::Isometry3d, std::string>> {
return pimpl->start_localizing(initial_solution);
auto Localization::relocalize_status() const -> RelocalizeStatus {
return pimpl->session->snapshot();
}

} // namespace rmcs::navigation
Loading
Loading