Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions libs/http-datasource/src/http-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ void HttpServer::go(std::string const& interfaceAddr, uint16_t port, uint32_t wa
// Allow derived class to set up the server.
setup(app);

// Raise Drogon's default WebSocket client message size limit
// (128 KB) to 10 MB. Large tile requests with many tile IDs
// can exceed the default and trigger connection shutdown,
// cascading into a crash (std::terminate).
app.setClientMaxWebSocketMessageSize(10 * 1024 * 1024);

app.addListener(interfaceAddr, port);

app.registerBeginningAdvice([this]() {
Expand Down
204 changes: 113 additions & 91 deletions libs/http-service/src/tiles-ws-controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1095,61 +1095,68 @@
return;
if (!layer)
return;
std::optional<std::pair<std::string, simfil::StringId>> stringPoolCommit;
std::vector<PullDispatch> pullDispatches;

{
std::lock_guard lock(mutex_);
if (cancelled_)
return;
auto requestedTileKey = matchDesiredTileKeyLocked(
layer->id(),
layer->layerInfo() ? std::max<uint32_t>(1U, layer->layerInfo()->stages_) : 1U);
// Late-arriving tile for an outdated request: drop before serialization work.
if (!requestedTileKey.has_value()) {
return;
}
try {
std::optional<std::pair<std::string, simfil::StringId>> stringPoolCommit;
std::vector<PullDispatch> pullDispatches;

if (currentWriteBatch_.has_value()) {
raise("TilesWsSession writer callback re-entered");
}
currentWriteBatch_.emplace();
writer_->write(layer);
auto batch = std::move(*currentWriteBatch_);
currentWriteBatch_.reset();

// If a StringPool message was generated, the writer updates writerOffsets_
// to the new highest string ID for this node after emitting it.
const auto nodeId = layer->nodeId();
const auto it = writerOffsets_.find(nodeId);
if (it != writerOffsets_.end()) {
const auto newOffset = it->second;
for (auto const& m : batch) {
if (m.type == TileLayerStream::MessageType::StringPool) {
stringPoolCommit = std::make_pair(nodeId, newOffset);
break;
}
{
std::lock_guard lock(mutex_);
if (cancelled_)
return;
auto requestedTileKey = matchDesiredTileKeyLocked(
layer->id(),
layer->layerInfo() ? std::max<uint32_t>(1U, layer->layerInfo()->stages_) : 1U);
// Late-arriving tile for an outdated request: drop before serialization work.
if (!requestedTileKey.has_value()) {
return;
}
}

for (auto& m : batch) {
OutgoingFrame frame;
frame.bytes = std::move(m.bytes);
frame.type = m.type;
if (m.type == TileLayerStream::MessageType::StringPool) {
frame.stringPoolCommit = stringPoolCommit;
frame.requestedTileKey = *requestedTileKey;
if (currentWriteBatch_.has_value()) {
raise("TilesWsSession writer callback re-entered");
}
currentWriteBatch_.emplace();
writer_->write(layer);
auto batch = std::move(*currentWriteBatch_);
currentWriteBatch_.reset();

// If a StringPool message was generated, the writer updates writerOffsets_
// to the new highest string ID for this node after emitting it.
const auto nodeId = layer->nodeId();
const auto it = writerOffsets_.find(nodeId);
if (it != writerOffsets_.end()) {
const auto newOffset = it->second;
for (auto const& m : batch) {
if (m.type == TileLayerStream::MessageType::StringPool) {
stringPoolCommit = std::make_pair(nodeId, newOffset);
break;
}
}
}
if (m.type == TileLayerStream::MessageType::TileFeatureLayer
|| m.type == TileLayerStream::MessageType::TileSourceDataLayer) {
frame.requestedTileKey = *requestedTileKey;

for (auto& m : batch) {
OutgoingFrame frame;
frame.bytes = std::move(m.bytes);
frame.type = m.type;
if (m.type == TileLayerStream::MessageType::StringPool) {
frame.stringPoolCommit = stringPoolCommit;
frame.requestedTileKey = *requestedTileKey;
}
if (m.type == TileLayerStream::MessageType::TileFeatureLayer
|| m.type == TileLayerStream::MessageType::TileSourceDataLayer) {
frame.requestedTileKey = *requestedTileKey;
}
enqueueOutgoingLocked(std::move(frame));
}
enqueueOutgoingLocked(std::move(frame));
// Newly queued frames can immediately satisfy blocked pull waiters.
drainReadyPullWaitersLocked(pullDispatches);
}
// Newly queued frames can immediately satisfy blocked pull waiters.
drainReadyPullWaitersLocked(pullDispatches);
dispatchPullResults(std::move(pullDispatches));
}
catch (const std::exception& e) {
log().error("Failed to stream tile layer: {}", e.what());
cancelNoStatus();
}
dispatchPullResults(std::move(pullDispatches));
}

/// Update per-request completion state and emit status when it changes.
Expand Down Expand Up @@ -1197,7 +1204,15 @@
cancelNoStatus();
return;
}
conn->send(encodeStreamMessage(type, payload), drogon::WebSocketMessageType::Binary);
try {
conn->send(
encodeStreamMessage(type, payload),
drogon::WebSocketMessageType::Binary);
}
catch (const std::exception& e) {
log().warn("WebSocket send failed: {}", e.what());
cancelNoStatus();
}
}

/// Send a status frame describing the current request statuses.
Expand Down Expand Up @@ -1339,66 +1354,73 @@
}

/// Handle control and request messages from the websocket client.
/// Wrapped in try-catch because Drogon calls this with no exception
/// protection — an uncaught exception would terminate the process.
void handleNewMessage(
const drogon::WebSocketConnectionPtr& conn,
std::string&& message,
const drogon::WebSocketMessageType& type) override
{
auto session = conn->getContext<TilesWsSession>();
if (!session) {
// This is a defensive fallback for unexpected context loss.
session = std::make_shared<TilesWsSession>(service_, conn, AuthHeaders{});
session->registerForMetrics();
{
std::lock_guard lock(gSessionRegistryMutex);
gSessionRegistry[session->clientId()] = session;
try {
auto session = conn->getContext<TilesWsSession>();
if (!session) {
// This is a defensive fallback for unexpected context loss.
session = std::make_shared<TilesWsSession>(service_, conn, AuthHeaders{});
session->registerForMetrics();
{
std::lock_guard lock(gSessionRegistryMutex);
gSessionRegistry[session->clientId()] = session;
}
conn->setContext(session);
}
conn->setContext(session);
}

if (type != drogon::WebSocketMessageType::Text) {
const auto payload = nlohmann::json::object({
{"type", "mapget.tiles.status"},
{"allDone", true},
{"requests", nlohmann::json::array()},
{"message", "Expected a text message containing JSON."},
}).dump();
conn->send(encodeStreamMessage(TileLayerStream::MessageType::Status, payload), drogon::WebSocketMessageType::Binary);
return;
}

nlohmann::json j;
try {
j = nlohmann::json::parse(message);
}
catch (const std::exception& e) {
const auto payload = nlohmann::json::object({
{"type", "mapget.tiles.status"},
{"allDone", true},
{"requests", nlohmann::json::array()},
{"message", fmt::format("Invalid JSON: {}", e.what())},
}).dump();
conn->send(encodeStreamMessage(TileLayerStream::MessageType::Status, payload), drogon::WebSocketMessageType::Binary);
return;
}
if (type != drogon::WebSocketMessageType::Text) {
const auto payload = nlohmann::json::object({
{"type", "mapget.tiles.status"},
{"allDone", true},
{"requests", nlohmann::json::array()},
{"message", "Expected a text message containing JSON."},
}).dump();
conn->send(encodeStreamMessage(TileLayerStream::MessageType::Status, payload), drogon::WebSocketMessageType::Binary);
return;
}

// Patch per-connection string pool offsets if supplied.
if (j.contains("stringPoolOffsets")) {
std::string errorMessage;
if (!session->applyStringPoolOffsetsPatch(j["stringPoolOffsets"], errorMessage)) {
nlohmann::json j;
try {

Check warning on line 1389 in libs/http-service/src/tiles-ws-controller.cpp

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Extract this nested try block into a separate method.

See more on https://sonarcloud.io/project/issues?id=ndsev_mapget&issues=AZ1xgVI4frTUmNjbgfYH&open=AZ1xgVI4frTUmNjbgfYH&pullRequest=156
j = nlohmann::json::parse(message);
}
catch (const std::exception& e) {
const auto payload = nlohmann::json::object({
{"type", "mapget.tiles.status"},
{"allDone", true},
{"requests", nlohmann::json::array()},
{"message", std::move(errorMessage)},
{"message", fmt::format("Invalid JSON: {}", e.what())},
}).dump();
conn->send(encodeStreamMessage(TileLayerStream::MessageType::Status, payload), drogon::WebSocketMessageType::Binary);
return;
}
}

const auto requestId = session->allocateRequestId(j);
session->updateFromClientRequest(j, requestId);
// Patch per-connection string pool offsets if supplied.
if (j.contains("stringPoolOffsets")) {
std::string errorMessage;
if (!session->applyStringPoolOffsetsPatch(j["stringPoolOffsets"], errorMessage)) {
const auto payload = nlohmann::json::object({
{"type", "mapget.tiles.status"},
{"allDone", true},
{"requests", nlohmann::json::array()},
{"message", std::move(errorMessage)},
}).dump();
conn->send(encodeStreamMessage(TileLayerStream::MessageType::Status, payload), drogon::WebSocketMessageType::Binary);
return;
}
}

const auto requestId = session->allocateRequestId(j);
session->updateFromClientRequest(j, requestId);
}
catch (const std::exception& e) {
log().error("WebSocket message handler failed: {}", e.what());
}
}

/// Abort outstanding backend work once the websocket is closed.
Expand Down
28 changes: 23 additions & 5 deletions libs/model/include/mapget/model/featurelayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,27 @@ class TileFeatureLayer : public TileLayer, public simfil::ModelPool
public:
// Keep ModelPool::resolve<T> overloads visible alongside the override below.
using ModelPool::resolve;
using Ptr = std::shared_ptr<TileFeatureLayer>;

struct CloneCacheKey
{
TileFeatureLayer const* model_ = nullptr;
uint32_t address_ = 0;

[[nodiscard]] bool operator==(CloneCacheKey const& other) const = default;
};

struct CloneCacheKeyHash
{
[[nodiscard]] size_t operator()(CloneCacheKey const& key) const noexcept
{
auto const modelHash = std::hash<TileFeatureLayer const*>{}(key.model_);
auto const addressHash = std::hash<uint32_t>{}(key.address_);
return modelHash ^ (addressHash + 0x9e3779b9U + (modelHash << 6U) + (modelHash >> 2U));
}
};

using CloneCache = std::unordered_map<CloneCacheKey, simfil::ModelNode::Ptr, CloneCacheKeyHash>;

/**
* This constructor initializes a new TileFeatureLayer instance.
Expand Down Expand Up @@ -257,9 +278,6 @@ class TileFeatureLayer : public TileLayer, public simfil::ModelPool
model_ptr<Feature> find(std::string_view const& type, KeyValueViewPairs const& queryIdParts) const;
model_ptr<Feature> find(std::string_view const& type, KeyValuePairs const& queryIdParts) const;

/** Shared pointer type */
using Ptr = std::shared_ptr<TileFeatureLayer>;

/** Optional staged-loading index (0-based) for this feature tile. */
[[nodiscard]] std::optional<uint32_t> stage() const override;
void setStage(std::optional<uint32_t> stage) override;
Expand Down Expand Up @@ -340,7 +358,7 @@ class TileFeatureLayer : public TileLayer, public simfil::ModelPool
* be appended to the existing feature.
*/
void clone(
std::unordered_map<uint32_t, simfil::ModelNode::Ptr>& clonedModelNodes,
CloneCache& clonedModelNodes,
TileFeatureLayer::Ptr const& otherLayer,
Feature const& otherFeature,
std::string_view const& type,
Expand All @@ -352,7 +370,7 @@ class TileFeatureLayer : public TileLayer, public simfil::ModelPool
* of nodes which are referenced multiple times.
*/
simfil::ModelNode::Ptr clone(
std::unordered_map<uint32_t, simfil::ModelNode::Ptr>& clonedModelNodes,
CloneCache& clonedModelNodes,
TileFeatureLayer::Ptr const& otherLayer,
simfil::ModelNode::Ptr const& otherNode);

Expand Down
6 changes: 5 additions & 1 deletion libs/model/src/attrlayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ bool AttributeLayerList::forEachLayer(
{
if (!cb)
return false;
for(auto const& [stringId, value] : fields()) {
auto local = localObject();
for (auto const& [stringId, value] : local->fields()) {
if (auto layerName = model().strings()->resolve(stringId)) {
if (value->addr().column() != TileFeatureLayer::ColumnId::AttributeLayers) {
log().warn("Don't add anything other than AttributeLayers into AttributeLayerLists!");
Expand All @@ -103,6 +104,9 @@ bool AttributeLayerList::forEachLayer(
return false;
}
}
if (auto ext = extension()) {
return ext->forEachLayer(cb);
}
return true;
}

Expand Down
7 changes: 5 additions & 2 deletions libs/model/src/featureid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,11 @@ std::string FeatureId::toString() const
}

if (values_) {
for (auto const& value : *values_) {
appendNodeValueToString(result, value);
auto const limit = std::min<size_t>(partNames_.size(), visibleValueIndices_.size());
for (size_t i = 0; i < limit; ++i) {
appendNodeValueToString(
result,
values_->at(static_cast<int64_t>(visibleValueIndices_[i])));
}
}

Expand Down
Loading
Loading