From cdeb7a0687fdf88b5e378569a9215428a36de06f Mon Sep 17 00:00:00 2001 From: Serein Pfeiffer Date: Thu, 9 Apr 2026 11:04:31 +0200 Subject: [PATCH] Fix WebSocket crash from oversized client messages --- libs/http-datasource/src/http-server.cpp | 6 + libs/http-service/src/tiles-ws-controller.cpp | 204 ++++++++++-------- 2 files changed, 119 insertions(+), 91 deletions(-) diff --git a/libs/http-datasource/src/http-server.cpp b/libs/http-datasource/src/http-server.cpp index 422ab1b6..e39cb847 100644 --- a/libs/http-datasource/src/http-server.cpp +++ b/libs/http-datasource/src/http-server.cpp @@ -153,6 +153,12 @@ void HttpServer::go(std::string const& interfaceAddr, uint16_t port, uint32_t wa // Allow derived class to set up the server. setup(app); + // Raise Drogon's default WebSocket client message size limit + // (128 KB) to 10 MB. Large tile requests with many tile IDs + // can exceed the default and trigger connection shutdown, + // cascading into a crash (std::terminate). + app.setClientMaxWebSocketMessageSize(10 * 1024 * 1024); + app.addListener(interfaceAddr, port); app.registerBeginningAdvice([this]() { diff --git a/libs/http-service/src/tiles-ws-controller.cpp b/libs/http-service/src/tiles-ws-controller.cpp index f9b05c77..672a0edb 100644 --- a/libs/http-service/src/tiles-ws-controller.cpp +++ b/libs/http-service/src/tiles-ws-controller.cpp @@ -1095,61 +1095,68 @@ class TilesWsSession : public std::enable_shared_from_this return; if (!layer) return; - std::optional> stringPoolCommit; - std::vector pullDispatches; - { - std::lock_guard lock(mutex_); - if (cancelled_) - return; - auto requestedTileKey = matchDesiredTileKeyLocked( - layer->id(), - layer->layerInfo() ? std::max(1U, layer->layerInfo()->stages_) : 1U); - // Late-arriving tile for an outdated request: drop before serialization work. - if (!requestedTileKey.has_value()) { - return; - } + try { + std::optional> stringPoolCommit; + std::vector pullDispatches; - if (currentWriteBatch_.has_value()) { - raise("TilesWsSession writer callback re-entered"); - } - currentWriteBatch_.emplace(); - writer_->write(layer); - auto batch = std::move(*currentWriteBatch_); - currentWriteBatch_.reset(); - - // If a StringPool message was generated, the writer updates writerOffsets_ - // to the new highest string ID for this node after emitting it. - const auto nodeId = layer->nodeId(); - const auto it = writerOffsets_.find(nodeId); - if (it != writerOffsets_.end()) { - const auto newOffset = it->second; - for (auto const& m : batch) { - if (m.type == TileLayerStream::MessageType::StringPool) { - stringPoolCommit = std::make_pair(nodeId, newOffset); - break; - } + { + std::lock_guard lock(mutex_); + if (cancelled_) + return; + auto requestedTileKey = matchDesiredTileKeyLocked( + layer->id(), + layer->layerInfo() ? std::max(1U, layer->layerInfo()->stages_) : 1U); + // Late-arriving tile for an outdated request: drop before serialization work. + if (!requestedTileKey.has_value()) { + return; } - } - for (auto& m : batch) { - OutgoingFrame frame; - frame.bytes = std::move(m.bytes); - frame.type = m.type; - if (m.type == TileLayerStream::MessageType::StringPool) { - frame.stringPoolCommit = stringPoolCommit; - frame.requestedTileKey = *requestedTileKey; + if (currentWriteBatch_.has_value()) { + raise("TilesWsSession writer callback re-entered"); + } + currentWriteBatch_.emplace(); + writer_->write(layer); + auto batch = std::move(*currentWriteBatch_); + currentWriteBatch_.reset(); + + // If a StringPool message was generated, the writer updates writerOffsets_ + // to the new highest string ID for this node after emitting it. + const auto nodeId = layer->nodeId(); + const auto it = writerOffsets_.find(nodeId); + if (it != writerOffsets_.end()) { + const auto newOffset = it->second; + for (auto const& m : batch) { + if (m.type == TileLayerStream::MessageType::StringPool) { + stringPoolCommit = std::make_pair(nodeId, newOffset); + break; + } + } } - if (m.type == TileLayerStream::MessageType::TileFeatureLayer - || m.type == TileLayerStream::MessageType::TileSourceDataLayer) { - frame.requestedTileKey = *requestedTileKey; + + for (auto& m : batch) { + OutgoingFrame frame; + frame.bytes = std::move(m.bytes); + frame.type = m.type; + if (m.type == TileLayerStream::MessageType::StringPool) { + frame.stringPoolCommit = stringPoolCommit; + frame.requestedTileKey = *requestedTileKey; + } + if (m.type == TileLayerStream::MessageType::TileFeatureLayer + || m.type == TileLayerStream::MessageType::TileSourceDataLayer) { + frame.requestedTileKey = *requestedTileKey; + } + enqueueOutgoingLocked(std::move(frame)); } - enqueueOutgoingLocked(std::move(frame)); + // Newly queued frames can immediately satisfy blocked pull waiters. + drainReadyPullWaitersLocked(pullDispatches); } - // Newly queued frames can immediately satisfy blocked pull waiters. - drainReadyPullWaitersLocked(pullDispatches); + dispatchPullResults(std::move(pullDispatches)); + } + catch (const std::exception& e) { + log().error("Failed to stream tile layer: {}", e.what()); + cancelNoStatus(); } - dispatchPullResults(std::move(pullDispatches)); } /// Update per-request completion state and emit status when it changes. @@ -1197,7 +1204,15 @@ class TilesWsSession : public std::enable_shared_from_this cancelNoStatus(); return; } - conn->send(encodeStreamMessage(type, payload), drogon::WebSocketMessageType::Binary); + try { + conn->send( + encodeStreamMessage(type, payload), + drogon::WebSocketMessageType::Binary); + } + catch (const std::exception& e) { + log().warn("WebSocket send failed: {}", e.what()); + cancelNoStatus(); + } } /// Send a status frame describing the current request statuses. @@ -1339,66 +1354,73 @@ class TilesWebSocketController final : public drogon::WebSocketControllergetContext(); - if (!session) { - // This is a defensive fallback for unexpected context loss. - session = std::make_shared(service_, conn, AuthHeaders{}); - session->registerForMetrics(); - { - std::lock_guard lock(gSessionRegistryMutex); - gSessionRegistry[session->clientId()] = session; + try { + auto session = conn->getContext(); + if (!session) { + // This is a defensive fallback for unexpected context loss. + session = std::make_shared(service_, conn, AuthHeaders{}); + session->registerForMetrics(); + { + std::lock_guard lock(gSessionRegistryMutex); + gSessionRegistry[session->clientId()] = session; + } + conn->setContext(session); } - conn->setContext(session); - } - - if (type != drogon::WebSocketMessageType::Text) { - const auto payload = nlohmann::json::object({ - {"type", "mapget.tiles.status"}, - {"allDone", true}, - {"requests", nlohmann::json::array()}, - {"message", "Expected a text message containing JSON."}, - }).dump(); - conn->send(encodeStreamMessage(TileLayerStream::MessageType::Status, payload), drogon::WebSocketMessageType::Binary); - return; - } - nlohmann::json j; - try { - j = nlohmann::json::parse(message); - } - catch (const std::exception& e) { - const auto payload = nlohmann::json::object({ - {"type", "mapget.tiles.status"}, - {"allDone", true}, - {"requests", nlohmann::json::array()}, - {"message", fmt::format("Invalid JSON: {}", e.what())}, - }).dump(); - conn->send(encodeStreamMessage(TileLayerStream::MessageType::Status, payload), drogon::WebSocketMessageType::Binary); - return; - } + if (type != drogon::WebSocketMessageType::Text) { + const auto payload = nlohmann::json::object({ + {"type", "mapget.tiles.status"}, + {"allDone", true}, + {"requests", nlohmann::json::array()}, + {"message", "Expected a text message containing JSON."}, + }).dump(); + conn->send(encodeStreamMessage(TileLayerStream::MessageType::Status, payload), drogon::WebSocketMessageType::Binary); + return; + } - // Patch per-connection string pool offsets if supplied. - if (j.contains("stringPoolOffsets")) { - std::string errorMessage; - if (!session->applyStringPoolOffsetsPatch(j["stringPoolOffsets"], errorMessage)) { + nlohmann::json j; + try { + j = nlohmann::json::parse(message); + } + catch (const std::exception& e) { const auto payload = nlohmann::json::object({ {"type", "mapget.tiles.status"}, {"allDone", true}, {"requests", nlohmann::json::array()}, - {"message", std::move(errorMessage)}, + {"message", fmt::format("Invalid JSON: {}", e.what())}, }).dump(); conn->send(encodeStreamMessage(TileLayerStream::MessageType::Status, payload), drogon::WebSocketMessageType::Binary); return; } - } - const auto requestId = session->allocateRequestId(j); - session->updateFromClientRequest(j, requestId); + // Patch per-connection string pool offsets if supplied. + if (j.contains("stringPoolOffsets")) { + std::string errorMessage; + if (!session->applyStringPoolOffsetsPatch(j["stringPoolOffsets"], errorMessage)) { + const auto payload = nlohmann::json::object({ + {"type", "mapget.tiles.status"}, + {"allDone", true}, + {"requests", nlohmann::json::array()}, + {"message", std::move(errorMessage)}, + }).dump(); + conn->send(encodeStreamMessage(TileLayerStream::MessageType::Status, payload), drogon::WebSocketMessageType::Binary); + return; + } + } + + const auto requestId = session->allocateRequestId(j); + session->updateFromClientRequest(j, requestId); + } + catch (const std::exception& e) { + log().error("WebSocket message handler failed: {}", e.what()); + } } /// Abort outstanding backend work once the websocket is closed.