diff --git a/statshouse.hpp b/statshouse.hpp index f8294e0..6a9dd3f 100644 --- a/statshouse.hpp +++ b/statshouse.hpp @@ -21,6 +21,7 @@ #include #include #include // PRIu32, PRIu64 +#include #include // va_list, va_start, va_end #include // vsnprintf #include @@ -46,7 +47,11 @@ #define close closesocket #else #include +#include +#include #include +#include +#include #include #define STATSHOUSE_UNLIKELY(x) __builtin_expect((x), 0) // could improve packing performance on your platform. Set to (x) to disable @@ -151,11 +156,17 @@ struct nop_mutex { // will select lock policy later namespace test { template struct traits; } +enum class flush_packet_result { + ok, + would_block, + error, +}; + class TransportUDPBase { friend class Registry; template friend struct test::traits; public: - using mutex = nop_mutex; // for now use in experiments + using mutex = std::mutex; enum { DEFAULT_PORT = 13337, SAFE_DATAGRAM_SIZE = 508, // https://stackoverflow.com/questions/1098897/what-is-the-largest-safe-udp-packet-size-on-the-internet @@ -180,6 +191,9 @@ class TransportUDPBase { } // Assigns arbitrary tag value by key name MetricBuilder & tag(string_view key, string_view str) { + if (key.size() == 2 && key.data()[0] == '_' && key.data()[1] == 'h') { + host_set = true; + } auto begin = buffer + buffer_pos; auto end = buffer + MAX_FULL_KEY_SIZE; if (STATSHOUSE_UNLIKELY(!(begin = pack_string(begin, end, key)))) { did_not_fit = true; return *this; } @@ -242,6 +256,7 @@ class TransportUDPBase { } TransportUDPBase *transport; bool env_set = false; // so current default_env will be added in pack_header even if set later. + bool host_set = false; // if explicit _h is set, do not add default hostname tag. bool did_not_fit = false; // we do not want any exceptions in writing metrics size_t next_tag = 1; size_t tags_count = 0; @@ -250,9 +265,33 @@ class TransportUDPBase { char buffer[MAX_FULL_KEY_SIZE]; // Uninitialized, due to performance considerations. }; - explicit TransportUDPBase() = default; + explicit TransportUDPBase() { + char host_buf[256] = {}; + if (::gethostname(host_buf, sizeof(host_buf) - 1) == 0) { + host_buf[sizeof(host_buf) - 1] = '\0'; + host_tag.assign(host_buf); + } + if (host_tag.empty()) { + return; + } + host_tag_tl_pair.resize(4 + host_tag.size() + 4); // key name, host, padding + auto begin = &host_tag_tl_pair[0]; + auto end = begin + host_tag_tl_pair.size(); + begin = pack32(begin, end, 2 | (uint32_t('_') << 8) | (uint32_t('h') << 16)); + begin = pack_string(begin, end, host_tag); + host_tag_tl_pair.resize(begin - &host_tag_tl_pair[0]); + } virtual ~TransportUDPBase() = default; + void set_app_name(string_view app) { + std::lock_guard lo(mu); + app_name = app; + } + std::string get_app_name() const { + std::lock_guard lo(mu); + return app_name; + } + void set_default_env(string_view env) { // automatically sent as tag '0' std::lock_guard lo(mu); default_env = env; @@ -355,6 +394,16 @@ class TransportUDPBase { protected: Stats stats; // allow implementations to update stats directly for simplicity + mutable mutex mu; + bool write_count_locked(const MetricBuilder &metric, double count, uint32_t tsUnixSec = 0) { + return write_count_impl(metric, count, tsUnixSec); + } + bool write_values_locked(const MetricBuilder &metric, const double *values, size_t values_count, double count = 1, uint32_t tsUnixSec = 0) { + return write_values_impl(metric, values, values_count, count, tsUnixSec); + } + string_view default_env_locked() const { return default_env; } + string_view app_name_locked() const { return app_name; } + private: enum { TL_INT_SIZE = 4, @@ -370,12 +419,14 @@ class TransportUDPBase { TL_STATSHOUSE_METRIC_UNIQUE_FIELDS_MASK = 1 << 2, BATCH_HEADER_LEN = TL_INT_SIZE * 3 // TL tag, fields_mask, # of batches }; - mutable mutex mu; size_t batch_size = 0; // we fill packet header before sending size_t packet_len = BATCH_HEADER_LEN; // reserve space for metric batch header std::string default_env; std::string default_env_tl_pair; // "0", default_env in TL format to optimized writing + std::string app_name; + std::string host_tag; + std::string host_tag_tl_pair; // "_h", hostname in TL format; size_t max_payload_size = DEFAULT_DATAGRAM_SIZE; bool immediate_flush = false; @@ -561,13 +612,19 @@ class TransportUDPBase { if (STATSHOUSE_UNLIKELY(!enoughSpace(begin, end, metric.buffer_pos))) { return nullptr; } std::memcpy(begin, metric.buffer, metric.buffer_pos); const bool add_env = !metric.env_set && !default_env_tl_pair.empty(); - put32(begin + metric.tags_count_pos(), metric.tags_count + int(add_env)); + const bool add_host = !metric.host_set && !host_tag_tl_pair.empty(); + put32(begin + metric.tags_count_pos(), metric.tags_count + int(add_env) + int(add_host)); begin += metric.buffer_pos; if (add_env) { if (STATSHOUSE_UNLIKELY(!enoughSpace(begin, end, default_env_tl_pair.size()))) { return nullptr; } std::memcpy(begin, default_env_tl_pair.data(), default_env_tl_pair.size()); begin += default_env_tl_pair.size(); } + if (add_host) { + if (STATSHOUSE_UNLIKELY(!enoughSpace(begin, end, host_tag_tl_pair.size()))) { return nullptr; } + std::memcpy(begin, host_tag_tl_pair.data(), host_tag_tl_pair.size()); + begin += host_tag_tl_pair.size(); + } if (fields_mask & TL_STATSHOUSE_METRIC_COUNTER_FIELDS_MASK) { if (STATSHOUSE_UNLIKELY(!(begin = pack64(begin, end, doubleBits(counter))))) { return nullptr;} } @@ -658,7 +715,7 @@ class TransportUDPBase { flush_impl(now); } } - virtual bool on_flush_packet(const char * data, size_t size, size_t batch_size) = 0; + virtual flush_packet_result on_flush_packet(const char * data, size_t size, size_t batch_size) = 0; void flush_impl(uint32_t now) { packet_ts = now; if (batch_size == 0) { @@ -668,12 +725,12 @@ class TransportUDPBase { put32(packet + TL_INT_SIZE, 0); // fields mask put32(packet + 2*TL_INT_SIZE, uint32_t(batch_size)); // batch size - auto result = on_flush_packet(packet, packet_len, batch_size); - if (result) { + const flush_packet_result result = on_flush_packet(packet, packet_len, batch_size); + if (result == flush_packet_result::ok) { ++stats.packets_sent; stats.metrics_sent += batch_size; stats.bytes_sent += packet_len; - } else { + } else if (result == flush_packet_result::error) { ++stats.packets_failed; stats.metrics_failed += batch_size; } @@ -748,12 +805,12 @@ class TransportUDP : public TransportUDPBase { int udp_socket = -1; bool dummy_instance = false; // better than separate class, because can be set depending on config - bool on_flush_packet(const char * data, size_t size, size_t batch_size) override { + flush_packet_result on_flush_packet(const char * data, size_t size, size_t batch_size) override { if (dummy_instance) { - return true; + return flush_packet_result::ok; } if (!is_socket_valid()) { // we reported connection error after start - return false; + return flush_packet_result::error; } #ifdef _WIN32 @@ -763,16 +820,16 @@ class TransportUDP : public TransportUDPBase { #endif if (result >= 0) { - return true; + return flush_packet_result::ok; } auto err = errno; if (err == EAGAIN) { // || err == EWOULDBLOCK ++stats.packets_overflow; stats.metrics_overflow += batch_size; - return false; + return flush_packet_result::would_block; } set_errno_error(err, "statshouse::TransportUDP sendto() failed"); - return false; + return flush_packet_result::error; } int create_socket(const std::string &ip, int port) { @@ -833,6 +890,459 @@ class TransportUDP : public TransportUDPBase { } }; +class TransportTCP : public TransportUDPBase { + template friend struct test::traits; +public: + TransportTCP():TransportTCP("127.0.0.1", DEFAULT_PORT) {} + TransportTCP(const std::string &host, int port, const std::string &resolve_targets = "") + : host_{host} + , resolve_targets_{resolve_targets} + , port_{port} { + (void)refresh_pools(); + primary_worker_.th = std::thread(&TransportTCP::run_worker, this, &primary_worker_); + secondary_worker_.th = std::thread(&TransportTCP::run_worker, this, &secondary_worker_); + dns_refresh_thread_ = std::thread(&TransportTCP::run_dns_refresh, this); + } + TransportTCP(const TransportTCP &) = delete; + TransportTCP &operator=(const TransportTCP &) = delete; + ~TransportTCP() override { + flush(true); + stop_.store(true); + { + std::lock_guard lock{primary_worker_.mu}; + primary_worker_.cv.notify_all(); + } + { + std::lock_guard lock{secondary_worker_.mu}; + secondary_worker_.cv.notify_all(); + } + if (primary_worker_.th.joinable()) { + primary_worker_.th.join(); + } + if (secondary_worker_.th.joinable()) { + secondary_worker_.th.join(); + } + close_socket(primary_worker_.sock); + close_socket(secondary_worker_.sock); + if (dns_refresh_thread_.joinable()) { + { + std::lock_guard lock{dns_mu_}; + dns_cv_.notify_all(); + } + dns_refresh_thread_.join(); + } + } + +private: + enum { tcpConnBucketCount = 512 }; + struct resolved_addr { + ::sockaddr_storage storage{}; + socklen_t len = 0; + }; + struct worker { + std::mutex mu; + std::condition_variable cv; + std::deque> queue; + std::vector pool; + size_t next_addr = 0; + int sock = -1; + std::thread th; + std::atomic would_block_bytes{0}; + }; + struct send_result { + bool ok = false; + int err = 0; + size_t written = 0; + }; + + std::string host_; + std::string resolve_targets_; + int port_ = 0; + worker primary_worker_; + worker secondary_worker_; + std::mutex route_mu_; + worker *primary_route_ = &primary_worker_; + worker *secondary_route_ = &secondary_worker_; + std::thread dns_refresh_thread_; + std::mutex dns_mu_; + std::condition_variable dns_cv_; + std::atomic_bool stop_{false}; + std::mt19937 rng_{std::random_device{}()}; + + flush_packet_result on_flush_packet(const char *data, size_t size, size_t /*batch_size*/) override { + if (size > 0xffff) { + stats.last_error = "statshouse::TransportTCP framing error: packet too large " + std::to_string(size); + return flush_packet_result::error; + } + std::vector payload(4 + size, '\0'); + const uint32_t body_len = static_cast(size); + payload[0] = static_cast(body_len); + payload[1] = static_cast(body_len >> 8); + payload[2] = static_cast(body_len >> 16); + payload[3] = static_cast(body_len >> 24); + std::memcpy(&payload[4], data, size); + worker *primary = nullptr; + worker *secondary = nullptr; + { + std::lock_guard lock{route_mu_}; + primary = primary_route_; + secondary = secondary_route_; + } + if (enqueue(*primary, payload)) { + return flush_packet_result::ok; + } + if (enqueue(*secondary, payload)) { + std::lock_guard lock{route_mu_}; + std::swap(primary_route_, secondary_route_); + return flush_packet_result::ok; + } + primary_worker_.would_block_bytes.fetch_add(payload.size(), std::memory_order_relaxed); + return flush_packet_result::would_block; + } + + bool enqueue(worker &w, const std::vector &payload) { + std::lock_guard lock{w.mu}; + if (w.queue.size() >= tcpConnBucketCount) { + return false; + } + w.queue.push_back(payload); + w.cv.notify_one(); + return true; + } + + void run_worker(worker *w) { + for (;;) { + std::vector payload; + { + std::unique_lock lock{w->mu}; + w->cv.wait(lock, [this, w]() { return stop_.load() || !w->queue.empty(); }); + if (w->queue.empty() && stop_.load()) { + break; + } + payload = std::move(w->queue.front()); + w->queue.pop_front(); + } + send_payload_with_reconnect(*w, payload); + } + } + + void send_payload_with_reconnect(worker &w, const std::vector &payload) { + for (;;) { + if (w.sock < 0 && !reconnect(w)) { + if (stop_.load()) { + return; + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + continue; + } + const send_result r = send_all(w.sock, &payload[0], payload.size()); + if (r.ok) { + report_would_block_metric_after_send(); + return; + } + set_errno_error(r.err, "statshouse::TransportTCP send() failed"); + close_socket(w.sock); + w.sock = -1; + return; // do not retry the same payload + } + } + + send_result send_all(int sock, const char *data, size_t size) { + send_result r{}; + while (r.written < size) { + const size_t left = size - r.written; +#ifdef _WIN32 + const int chunk = left > static_cast(INT_MAX) ? INT_MAX : static_cast(left); + const int n = ::send(sock, data + r.written, chunk, 0); +#else + const ssize_t n = ::send(sock, data + r.written, left, MSG_NOSIGNAL); +#endif + if (n > 0) { + r.written += static_cast(n); + continue; + } + if (n == 0) { + r.err = ECONNRESET; + return r; + } + if (errno == EINTR) { + continue; + } + r.err = errno; + return r; + } + r.ok = true; + return r; + } + + bool reconnect(worker &w) { + resolved_addr addr{}; + { + std::lock_guard lock{w.mu}; + if (w.pool.empty()) { + return false; + } + addr = w.pool[w.next_addr % w.pool.size()]; + ++w.next_addr; + } + int sock = ::socket(addr.storage.ss_family, SOCK_STREAM, IPPROTO_TCP); + if (sock < 0) { + set_errno_error(errno, "statshouse::TransportTCP socket() failed"); + return false; + } + if (!set_socket_timeouts(sock, 5000)) { + int err = errno; + ::close(sock); + set_errno_error(err, "statshouse::TransportTCP setsockopt() timeout failed"); + return false; + } + auto ap = reinterpret_cast(&addr.storage); + if (!connect_with_timeout(sock, ap, addr.len, 5000)) { + int err = errno; + ::close(sock); + set_errno_error(err, "statshouse::TransportTCP connect() failed"); + return false; + } + static const char kHeader[] = "statshousev1"; + const send_result header_write = send_all(sock, kHeader, sizeof(kHeader) - 1); + if (!header_write.ok) { + int err = header_write.err; + ::close(sock); + set_errno_error(err, "statshouse::TransportTCP send header failed"); + return false; + } + w.sock = sock; + return true; + } + + void run_dns_refresh() { + while (!stop_.load()) { + std::unique_lock lock{dns_mu_}; + dns_cv_.wait_for(lock, std::chrono::minutes(1), [this]() { return stop_.load(); }); + if (stop_.load()) { + break; + } + lock.unlock(); + (void)refresh_pools(); + } + } + + bool refresh_pools() { + std::vector addrs; + if (!resolve_addrs(addrs)) { + return false; + } + if (addrs.size() > 1) { + std::shuffle(addrs.begin(), addrs.end(), rng_); + } + const size_t mid = (addrs.size() + 1) / 2; + std::vector primary_pool(addrs.begin(), addrs.begin() + mid); + std::vector secondary_pool; + if (mid < addrs.size()) { + secondary_pool.assign(addrs.begin() + mid, addrs.end()); + } + { + std::lock_guard lock{primary_worker_.mu}; + primary_worker_.pool = primary_pool; + primary_worker_.next_addr = 0; + } + { + std::lock_guard lock{secondary_worker_.mu}; + secondary_worker_.pool = secondary_pool; + secondary_worker_.next_addr = 0; + } + return true; + } + + bool resolve_addrs(std::vector &out) { + out.clear(); + const bool has_explicit_targets = !resolve_targets_.empty(); + const std::string &targets = has_explicit_targets ? resolve_targets_ : host_; + ::addrinfo hints{}; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + size_t begin = 0; + for (;;) { + size_t end = targets.find(',', begin); + if (end == std::string::npos) { + end = targets.size(); + } + std::string endpoint = targets.substr(begin, end - begin); + trim_spaces(endpoint); + if (endpoint.empty()) { + if (end == targets.size()) { + break; + } + begin = end + 1; + continue; + } + std::string host = endpoint; + std::string service = std::to_string(port_); + if (has_explicit_targets) { + if (!split_host_port(endpoint, host, service)) { + stats.last_error = "statshouse::TransportTCP invalid endpoint=" + endpoint; + if (end == targets.size()) { + break; + } + begin = end + 1; + continue; + } + } + ::addrinfo *result = nullptr; + const int rc = ::getaddrinfo(host.c_str(), service.c_str(), &hints, &result); + if (rc != 0) { + stats.last_error = "statshouse::TransportTCP getaddrinfo() failed host=" + host + ":" + service + ", error=" + gai_strerror(rc); + continue; + } + for (auto *it = result; it != nullptr; it = it->ai_next) { + if (!it->ai_addr || it->ai_addrlen <= 0 || it->ai_addrlen > sizeof(::sockaddr_storage)) { + continue; + } + resolved_addr a{}; + std::memcpy(&a.storage, it->ai_addr, static_cast(it->ai_addrlen)); + a.len = static_cast(it->ai_addrlen); + out.push_back(a); + } + ::freeaddrinfo(result); + if (end == targets.size()) { + break; + } + begin = end + 1; + } + if (out.empty()) { + stats.last_error = "statshouse::TransportTCP resolve returned empty list host=" + host_; + return false; + } + return true; + } + + void close_socket(int &sock) { + if (sock >= 0) { + (void)::close(sock); + } + sock = -1; + } + + void report_would_block_metric_after_send() { + const size_t n = primary_worker_.would_block_bytes.exchange(0, std::memory_order_acq_rel); + if (n == 0) { + return; + } + std::lock_guard lock(mu); + MetricBuilder kb = metric("__src_client_write_err"); + kb.tag("0", default_env_locked()); + kb.tag("1", "5"); // lang: cpp + kb.tag("2", "1"); // kind: would block + kb.tag("3", app_name_locked()); + const double lost = static_cast(n); + (void)write_values_locked(kb, &lost, 1, 0, 0); + } + + void set_errno_error(int err, const char *msg) { + stats.last_error.clear(); + stats.last_error.append(msg); + stats.last_error.append(" errno="); + stats.last_error.append(std::to_string(err)); + stats.last_error.append(", "); + stats.last_error.append(strerror(err)); + } + static void trim_spaces(std::string &s) { + size_t start = 0; + size_t end = s.size(); + while (start < end && (s[start] == ' ' || s[start] == '\t' || s[start] == '\n' || s[start] == '\r')) { + ++start; + } + while (end > start && (s[end - 1] == ' ' || s[end - 1] == '\t' || s[end - 1] == '\n' || s[end - 1] == '\r')) { + --end; + } + if (start == 0 && end == s.size()) { + return; + } + s = s.substr(start, end - start); + } + static bool split_host_port(const std::string &endpoint, std::string &host, std::string &port) { + if (endpoint.empty()) { + return false; + } + if (endpoint[0] == '[') { + const size_t rb = endpoint.find(']'); + if (rb == std::string::npos || rb + 2 > endpoint.size() || endpoint[rb + 1] != ':') { + return false; + } + host = endpoint.substr(1, rb - 1); + port = endpoint.substr(rb + 2); + } else { + const size_t first_colon = endpoint.find(':'); + const size_t last_colon = endpoint.rfind(':'); + if (first_colon == std::string::npos || first_colon != last_colon) { + return false; + } + host = endpoint.substr(0, first_colon); + port = endpoint.substr(first_colon + 1); + } + trim_spaces(host); + trim_spaces(port); + return !host.empty() && !port.empty(); + } + static bool set_socket_timeouts(int sock, int timeout_ms) { + ::timeval tv{}; + tv.tv_sec = timeout_ms / 1000; + tv.tv_usec = (timeout_ms % 1000) * 1000; + if (::setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast(&tv), sizeof(tv)) != 0) { + return false; + } + if (::setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast(&tv), sizeof(tv)) != 0) { + return false; + } + return true; + } + static bool connect_with_timeout(int sock, const sockaddr *addr, socklen_t len, int timeout_ms) { + const int flags = ::fcntl(sock, F_GETFL, 0); + if (flags < 0) { + return false; + } + if (::fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0) { + return false; + } + int rc = ::connect(sock, addr, len); + if (rc == 0) { + (void)::fcntl(sock, F_SETFL, flags); + return true; + } + if (errno != EINPROGRESS) { + (void)::fcntl(sock, F_SETFL, flags); + return false; + } + fd_set wfds; + FD_ZERO(&wfds); + FD_SET(sock, &wfds); + ::timeval tv{}; + tv.tv_sec = timeout_ms / 1000; + tv.tv_usec = (timeout_ms % 1000) * 1000; + rc = ::select(sock + 1, nullptr, &wfds, nullptr, &tv); + if (rc <= 0) { + (void)::fcntl(sock, F_SETFL, flags); + if (rc == 0) { + errno = ETIMEDOUT; + } + return false; + } + int so_error = 0; + socklen_t so_len = sizeof(so_error); + if (::getsockopt(sock, SOL_SOCKET, SO_ERROR, reinterpret_cast(&so_error), &so_len) != 0) { + (void)::fcntl(sock, F_SETFL, flags); + return false; + } + (void)::fcntl(sock, F_SETFL, flags); + if (so_error != 0) { + errno = so_error; + return false; + } + return true; + } +}; + class Registry { template friend struct test::traits; enum { @@ -844,9 +1354,13 @@ class Registry { }; public: struct options { + std::string app; + std::string env; std::string default_env; + std::string protocol{"udp"}; // "udp" (default) or "tcp" std::string host{"127.0.0.1"}; int port{TransportUDP::DEFAULT_PORT}; + std::string resolve_targets; // optional comma-separated host:port list for TCP size_t max_udp_packet_size{0}; size_t max_bucket_size{DEFAULT_MAX_BUCKET_SIZE}; std::function logger; @@ -860,11 +1374,23 @@ class Registry { , sampling_disabled{o.sampling_disabled} , logger{o.logger} , incremental_flush_disabled{o.incremental_flush_disabled} - , transport{o.host, o.port} { - if (!o.default_env.empty()) { - transport.set_default_env(o.default_env); - } - if (o.max_udp_packet_size != 0) { + , transport{ + o.protocol == "tcp" ? std::string() : o.host, + o.protocol == "tcp" ? 0 : o.port, + } { + if (o.protocol == "tcp") { + transport_tcp.reset(new TransportTCP(o.host, o.port, o.resolve_targets)); + use_tcp_transport = true; + } + if (!o.app.empty()) { + active_transport().set_app_name(o.app); + } + if (!o.env.empty()) { + active_transport().set_default_env(o.env); + } else if (!o.default_env.empty()) { + active_transport().set_default_env(o.default_env); + } + if (o.max_udp_packet_size != 0 && !use_tcp_transport) { transport.set_max_udp_packet_size(o.max_udp_packet_size); } } @@ -993,7 +1519,7 @@ class Registry { std::vector unique; }; struct bucket { - bucket(const TransportUDP::MetricBuilder &key, uint32_t timestamp) + bucket(const TransportUDPBase::MetricBuilder &key, uint32_t timestamp) : key{key} , timestamp{timestamp} , last_flush_timestamp{0} @@ -1001,7 +1527,7 @@ class Registry { , rand{} , queue_ptr{nullptr} { } - TransportUDP::MetricBuilder key; + TransportUDPBase::MetricBuilder key; uint32_t timestamp; uint32_t last_flush_timestamp; int waterlevel; @@ -1014,7 +1540,7 @@ class Registry { bucket_ref() : registry{nullptr} { } - bucket_ref(Registry *registry, const TransportUDP::MetricBuilder &key) + bucket_ref(Registry *registry, const TransportUDPBase::MetricBuilder &key) : registry{registry} , ptr{registry->get_or_create_bucket(key)} { } @@ -1023,7 +1549,7 @@ class Registry { }; struct bucket_waterlevel_ref : bucket_ref { bucket_waterlevel_ref() = default; - bucket_waterlevel_ref(Registry *registry, const TransportUDP::MetricBuilder &key) + bucket_waterlevel_ref(Registry *registry, const TransportUDPBase::MetricBuilder &key) : bucket_ref{registry, key} { init(); } @@ -1063,7 +1589,7 @@ class Registry { public: struct EventCountMetricRef : private bucket_ref { EventCountMetricRef() = default; - EventCountMetricRef(Registry *registry, const TransportUDP::MetricBuilder &key) + EventCountMetricRef(Registry *registry, const TransportUDPBase::MetricBuilder &key) : bucket_ref{registry, key} { } explicit operator bool() const { return ptr.get() != nullptr; } @@ -1079,7 +1605,7 @@ class Registry { }; struct EventMetricRef : private bucket_ref { EventMetricRef() = default; - EventMetricRef(Registry *registry, const TransportUDP::MetricBuilder &key) + EventMetricRef(Registry *registry, const TransportUDPBase::MetricBuilder &key) : bucket_ref{registry, key} { } explicit operator bool() const { return ptr.get() != nullptr; } @@ -1099,7 +1625,7 @@ class Registry { }; struct UniqueMetricRef : private bucket_ref { UniqueMetricRef() = default; - UniqueMetricRef(Registry *registry, const TransportUDP::MetricBuilder &key) + UniqueMetricRef(Registry *registry, const TransportUDPBase::MetricBuilder &key) : bucket_ref{registry, key} { } explicit operator bool() const { return ptr.get() != nullptr; } @@ -1119,7 +1645,7 @@ class Registry { }; struct WaterlevelMetricRef : private bucket_waterlevel_ref { WaterlevelMetricRef() = default; - WaterlevelMetricRef(Registry *registry, const TransportUDP::MetricBuilder &key) + WaterlevelMetricRef(Registry *registry, const TransportUDPBase::MetricBuilder &key) : bucket_waterlevel_ref{registry, key} { } explicit operator bool() const { return ptr.get() != nullptr; } @@ -1146,7 +1672,7 @@ class Registry { public: MetricBuilder(Registry *registry, string_view name) : registry{registry} - , key{®istry->transport, name} { + , key{registry->active_transport_ptr(), name} { } MetricBuilder &tag(string_view str) { key.tag(str); @@ -1209,14 +1735,14 @@ class Registry { } private: Registry *registry; - TransportUDP::MetricBuilder key; + TransportUDPBase::MetricBuilder key; }; MetricBuilder metric(string_view name) { return {this, name}; } bool write_usage_metrics(string_view project, string_view cluster) { std::lock_guard transport_lock{transport_mu}; - return transport.write_usage_metrics(project, cluster); + return active_transport().write_usage_metrics(project, cluster); } // If called once, then you need to call it until the end of the life of the registry. void set_external_time(uint32_t timestamp) { @@ -1231,7 +1757,7 @@ class Registry { flush(time_now()); { std::lock_guard l{transport_mu}; - transport.flush(true); + active_transport().flush(true); } } else if (incremental_flush_disabled) { flush(time_now() - 1); @@ -1248,7 +1774,11 @@ class Registry { } void set_default_env(string_view env) { std::lock_guard transport_lock{transport_mu}; - transport.set_default_env(env); + active_transport().set_default_env(env); + } + void set_app_name(string_view app) { + std::lock_guard transport_lock{transport_mu}; + active_transport().set_app_name(app); } bool set_metrics_logging_enabled(bool enabled) { if (!logger) { @@ -1271,12 +1801,12 @@ class Registry { res.freelist_size = stat.freelist_size.load(std::memory_order_relaxed); res.bucket_count = stat.bucket_count.load(std::memory_order_relaxed); std::lock_guard transport_lock{transport_mu}; - static_cast(res) = transport.get_stats(); + static_cast(res) = active_transport().get_stats(); return res; } void clear_stats() { std::lock_guard transport_lock{transport_mu}; - transport.clear_stats(); + active_transport().clear_stats(); } class Clock { public: @@ -1306,7 +1836,7 @@ class Registry { std::atomic_bool stop{}; }; private: - bool update_multivalue_by_key(const TransportUDP::MetricBuilder &key, multivalue_view value) { + bool update_multivalue_by_key(const TransportUDPBase::MetricBuilder &key, multivalue_view value) { auto timestamp = time_now(); return update_multivalue_by_ref(get_or_create_bucket(key, timestamp), timestamp, value); } @@ -1442,7 +1972,7 @@ class Registry { } return {count_flushed, count_scanned, queue_size}; } - std::shared_ptr get_or_create_bucket(const TransportUDP::MetricBuilder &key, uint32_t timestamp = 0) { + std::shared_ptr get_or_create_bucket(const TransportUDPBase::MetricBuilder &key, uint32_t timestamp = 0) { std::lock_guard lock{mu}; auto it = buckets.find(string_view{key.buffer, key.buffer_pos}); if (it != buckets.end()) { @@ -1455,7 +1985,7 @@ class Registry { stat.bucket_count.store(buckets.size(), std::memory_order_relaxed); return ptr; } - std::shared_ptr alloc_bucket(const TransportUDP::MetricBuilder &key, uint32_t timestamp) { + std::shared_ptr alloc_bucket(const TransportUDPBase::MetricBuilder &key, uint32_t timestamp) { if (freelist.empty()) { return std::make_shared(key, timestamp); } @@ -1625,10 +2155,27 @@ class Registry { std::function logger; // "puts" signature bool incremental_flush_disabled; TransportUDP transport; + std::unique_ptr transport_tcp; + bool use_tcp_transport{false}; std::deque> queue; std::vector> freelist; std::unordered_map, string_view_hash, string_view_equal> buckets; statistics stat{}; + + TransportUDPBase *active_transport_ptr() { + if (use_tcp_transport && transport_tcp) { + return transport_tcp.get(); + } + return &transport; + } + const TransportUDPBase *active_transport_ptr() const { + if (use_tcp_transport && transport_tcp) { + return transport_tcp.get(); + } + return &transport; + } + TransportUDPBase &active_transport() { return *active_transport_ptr(); } + const TransportUDPBase &active_transport() const { return *active_transport_ptr(); } }; } // namespace statshouse