Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 50 additions & 12 deletions include/dct/face/transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <memory>
#include <string>
#include <string_view>
#include <queue>

#include <boost/asio/version.hpp>
#if 1
Expand Down Expand Up @@ -370,6 +371,7 @@ struct TransportUdpP final : TransportUdp {

struct TransportTcp : Transport {
tcp::socket sock_;
uint32_t sockId_{0};
bool isConnected_{false};
bool didCCB_{false};

Expand All @@ -389,15 +391,25 @@ struct TransportTcp : Transport {
// to data ratio. DCT's encap cost is ~100 bytes (due almost entirely to signing overhead -
// 64 bytes of signature plus 32 bytes of key locator) so an 8K mtu results in 99% efficiency
// with 65ms worst-case jitter on a 1Mbps backhaul.
constexpr ptrdiff_t mtu() const noexcept final { return max_pkt_size; }
constexpr ptrdiff_t mtu() const noexcept final { return max_pkt_size - 128; }
constexpr std::chrono::milliseconds tts() const noexcept final { return std::chrono::milliseconds(1500/(1000/8)); }

// Outgoing packet queue
std::queue<ofats::any_invocable<void()>> outqueue_;
// Maximum number of packets to keep in outgoing queue.
// This number should not be too largem, otherwise the queue will take forever to flush
// after a network partition that does not cause a TCP reset.
static constexpr size_t outqueue_max_ = 128;

TransportTcp(asio::io_context& ioc, onRcv&& rcb, onConnect&& ccb)
: Transport(std::move(rcb), std::move(ccb)), sock_{ioc} { }

void close() {
dct::log(L_INFO)("TransportTcp::close sock={}", sockId_);
roff_ = 0;
isConnected_ = false;
sockId_++;
while (!outqueue_.empty()) outqueue_.pop(); // clear
boost::system::error_code ec;
sock_.set_option(asio::socket_base::linger{false,0}, ec);
sock_.close(ec);
Expand All @@ -412,6 +424,7 @@ struct TransportTcp : Transport {
virtual void restart() = 0;

void finishRestart() {
dct::log(L_INFO)("TransportTcp::connected sock={}", sockId_);
isConnected_ = true;
issueRead();
if (! didCCB_) {
Expand All @@ -433,8 +446,11 @@ struct TransportTcp : Transport {
void issueRead() {
if (! isConnected_) return;
sock_.async_read_some(asio::buffer(rbuf_) + roff_,
[this](const boost::system::error_code& ec, std::size_t rdlen) {
[this, sockId=sockId_](const boost::system::error_code& ec, std::size_t rdlen) {
if (!isConnected_ || sockId != sockId_) return; // connection changed

if (ec) {
dct::log(L_WARN)("TransportTcp::read failed: {}", ec.message());
restart();
return;
}
Expand All @@ -445,6 +461,7 @@ struct TransportTcp : Transport {
// there's enough data to get tlv type and length
const auto l = tlvLength(d);
if (l == 0) { // invalid TLV - restart connection
dct::log(L_ERROR)( "TransportTcp::read got invalid TLV, l={}", l);
restart();
return;
}
Expand All @@ -462,22 +479,46 @@ struct TransportTcp : Transport {
});
}

void outqueue_continue() {
if (!outqueue_.empty()) outqueue_.pop();
if (!outqueue_.empty()) outqueue_.front()();
}

void send_pkt(const uint8_t* pkt, size_t len, _sendCb&& cb) {
if (! isConnected_) return;
sock_.async_write_some(asio::buffer(pkt, len),
[len, this, cb = std::move(cb)](const boost::system::error_code& ec, std::size_t sent) {
if (outqueue_.size() >= outqueue_max_) return;

// It is likely pointless to send anything that has been queued
// for too long; just skip these packets to make way for new ones
auto expiry = std::chrono::system_clock::now() + std::chrono::seconds(6);

outqueue_.push([this, pkt, len, cb = std::move(cb), expiry]() {
if (expiry < std::chrono::system_clock::now()) return outqueue_continue();
send_pkt_internal(pkt, len, cb);
});
if (outqueue_.size() == 1) outqueue_.front()();
}

void send_pkt_internal(const uint8_t* pkt, size_t len, const _sendCb& cb) {
if (! isConnected_) return;
asio::async_write(sock_, asio::buffer(pkt, len),
[this, len, &cb, sockId=sockId_](const boost::system::error_code& ec, std::size_t sent) {
if (!isConnected_ || sockId != sockId_) return; // connection changed

if (ec.failed()) {
dct::log(L_WARN)("TransportTcp::write failed: {} ({})", ec.message(), ec.value());
switch (ec.value()) {
default:
throw std::runtime_error(dct::format("send failed: {}", ec.message()));
// throw std::runtime_error(dct::format("send failed: {}", ec.message()));

case EPIPE: case ECONNRESET: case ECANCELED: case ETIMEDOUT:
restart(); // try to reconnect
return;
}
/*NotReached*/
}
if (sent < len) print("tcp send botch: sent {} of {}\n", sent, len);
if (sent < len) dct::log(L_TRACE)("TransportTcp::send batch: sent {} of {}", sent, len);
else outqueue_continue();
});
}
};
Expand All @@ -488,12 +529,9 @@ struct TransportTcpA final : TransportTcp {

void on_connect(const boost::system::error_code& ec) {
if (ec.failed()) {
if (ec.value() != ECONNREFUSED && ec.value() != ETIMEDOUT) {
dct::print("tcp connect failed: {}", ec.message());
throw runtime_error(dct::format("connect failed: {}", ec.message()));
}
restart();
return;
dct::log(L_WARN)("TransportTcpA tcp failed to connect to {}: {} ({})",
peer_.address().to_string(), ec.message(), ec.value());
return doAfter(10s, [this](){ restart(); }); // prevent log spam
}
finishRestart();
}
Expand Down