diff --git a/include/dct/face/transport.hpp b/include/dct/face/transport.hpp index ab138d9..5b16d24 100644 --- a/include/dct/face/transport.hpp +++ b/include/dct/face/transport.hpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #if 1 @@ -370,6 +371,7 @@ struct TransportUdpP final : TransportUdp { struct TransportTcp : Transport { tcp::socket sock_; + uint32_t sockId_{0}; bool isConnected_{false}; bool didCCB_{false}; @@ -389,15 +391,25 @@ struct TransportTcp : Transport { // to data ratio. DCT's encap cost is ~100 bytes (due almost entirely to signing overhead - // 64 bytes of signature plus 32 bytes of key locator) so an 8K mtu results in 99% efficiency // with 65ms worst-case jitter on a 1Mbps backhaul. - constexpr ptrdiff_t mtu() const noexcept final { return max_pkt_size; } + constexpr ptrdiff_t mtu() const noexcept final { return max_pkt_size - 128; } constexpr std::chrono::milliseconds tts() const noexcept final { return std::chrono::milliseconds(1500/(1000/8)); } + // Outgoing packet queue + std::queue> outqueue_; + // Maximum number of packets to keep in outgoing queue. + // This number should not be too largem, otherwise the queue will take forever to flush + // after a network partition that does not cause a TCP reset. + static constexpr size_t outqueue_max_ = 128; + TransportTcp(asio::io_context& ioc, onRcv&& rcb, onConnect&& ccb) : Transport(std::move(rcb), std::move(ccb)), sock_{ioc} { } void close() { + dct::log(L_INFO)("TransportTcp::close sock={}", sockId_); roff_ = 0; isConnected_ = false; + sockId_++; + while (!outqueue_.empty()) outqueue_.pop(); // clear boost::system::error_code ec; sock_.set_option(asio::socket_base::linger{false,0}, ec); sock_.close(ec); @@ -412,6 +424,7 @@ struct TransportTcp : Transport { virtual void restart() = 0; void finishRestart() { + dct::log(L_INFO)("TransportTcp::connected sock={}", sockId_); isConnected_ = true; issueRead(); if (! didCCB_) { @@ -433,8 +446,11 @@ struct TransportTcp : Transport { void issueRead() { if (! isConnected_) return; sock_.async_read_some(asio::buffer(rbuf_) + roff_, - [this](const boost::system::error_code& ec, std::size_t rdlen) { + [this, sockId=sockId_](const boost::system::error_code& ec, std::size_t rdlen) { + if (!isConnected_ || sockId != sockId_) return; // connection changed + if (ec) { + dct::log(L_WARN)("TransportTcp::read failed: {}", ec.message()); restart(); return; } @@ -445,6 +461,7 @@ struct TransportTcp : Transport { // there's enough data to get tlv type and length const auto l = tlvLength(d); if (l == 0) { // invalid TLV - restart connection + dct::log(L_ERROR)( "TransportTcp::read got invalid TLV, l={}", l); restart(); return; } @@ -462,14 +479,37 @@ struct TransportTcp : Transport { }); } + void outqueue_continue() { + if (!outqueue_.empty()) outqueue_.pop(); + if (!outqueue_.empty()) outqueue_.front()(); + } + void send_pkt(const uint8_t* pkt, size_t len, _sendCb&& cb) { if (! isConnected_) return; - sock_.async_write_some(asio::buffer(pkt, len), - [len, this, cb = std::move(cb)](const boost::system::error_code& ec, std::size_t sent) { + if (outqueue_.size() >= outqueue_max_) return; + + // It is likely pointless to send anything that has been queued + // for too long; just skip these packets to make way for new ones + auto expiry = std::chrono::system_clock::now() + std::chrono::seconds(6); + + outqueue_.push([this, pkt, len, cb = std::move(cb), expiry]() { + if (expiry < std::chrono::system_clock::now()) return outqueue_continue(); + send_pkt_internal(pkt, len, cb); + }); + if (outqueue_.size() == 1) outqueue_.front()(); + } + + void send_pkt_internal(const uint8_t* pkt, size_t len, const _sendCb& cb) { + if (! isConnected_) return; + asio::async_write(sock_, asio::buffer(pkt, len), + [this, len, &cb, sockId=sockId_](const boost::system::error_code& ec, std::size_t sent) { + if (!isConnected_ || sockId != sockId_) return; // connection changed + if (ec.failed()) { + dct::log(L_WARN)("TransportTcp::write failed: {} ({})", ec.message(), ec.value()); switch (ec.value()) { default: - throw std::runtime_error(dct::format("send failed: {}", ec.message())); + // throw std::runtime_error(dct::format("send failed: {}", ec.message())); case EPIPE: case ECONNRESET: case ECANCELED: case ETIMEDOUT: restart(); // try to reconnect @@ -477,7 +517,8 @@ struct TransportTcp : Transport { } /*NotReached*/ } - if (sent < len) print("tcp send botch: sent {} of {}\n", sent, len); + if (sent < len) dct::log(L_TRACE)("TransportTcp::send batch: sent {} of {}", sent, len); + else outqueue_continue(); }); } }; @@ -488,12 +529,9 @@ struct TransportTcpA final : TransportTcp { void on_connect(const boost::system::error_code& ec) { if (ec.failed()) { - if (ec.value() != ECONNREFUSED && ec.value() != ETIMEDOUT) { - dct::print("tcp connect failed: {}", ec.message()); - throw runtime_error(dct::format("connect failed: {}", ec.message())); - } - restart(); - return; + dct::log(L_WARN)("TransportTcpA tcp failed to connect to {}: {} ({})", + peer_.address().to_string(), ec.message(), ec.value()); + return doAfter(10s, [this](){ restart(); }); // prevent log spam } finishRestart(); }