From ce34368b5ce184f7b20a3808da468f7100a15044 Mon Sep 17 00:00:00 2001 From: Vinicius Livramento Date: Fri, 6 Feb 2026 14:20:53 +0000 Subject: [PATCH 1/7] FIX: Fix missing schemas in databento-cpp function --- CHANGELOG.md | 5 +++++ src/record.cpp | 21 +++++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c34ca11..d7ae7a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 0.47.1 - TBD + +### Bug fixes +- Added conversion for missing schemas for function `RTypeFromSchema` + ## 0.47.0 - 2026-02-04 ### Enhancements diff --git a/src/record.cpp b/src/record.cpp index 6f252ed..be0f29a 100644 --- a/src/record.cpp +++ b/src/record.cpp @@ -88,15 +88,36 @@ databento::RType Record::RTypeFromSchema(const Schema schema) { case Schema::Ohlcv1D: { return databento::RType::Ohlcv1D; } + case Schema::OhlcvEod: { + return databento::RType::OhlcvEod; + } case Schema::Definition: { return databento::RType::InstrumentDef; } case Schema::Statistics: { return databento::RType::Statistics; } + case Schema::Status: { + return databento::RType::Status; + } case Schema::Imbalance: { return databento::RType::Imbalance; } + case Schema::Cmbp1: { + return databento::RType::Cmbp1; + } + case Schema::Cbbo1S: { + return databento::RType::Cbbo1S; + } + case Schema::Cbbo1M: { + return databento::RType::Cbbo1M; + } + case Schema::Bbo1S: { + return databento::RType::Bbo1S; + } + case Schema::Bbo1M: { + return databento::RType::Bbo1M; + } default: { throw InvalidArgumentError{ "Record::RTypeFromSchema", "schema", From bddfe1287b5cbee2963e870f01853746caf0e503 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Wed, 11 Feb 2026 12:22:37 -0600 Subject: [PATCH 2/7] ADD: Add test that closed connection throws --- tests/src/live_blocking_tests.cpp | 50 +++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/tests/src/live_blocking_tests.cpp b/tests/src/live_blocking_tests.cpp index 07ab8ed..429f594 100644 --- a/tests/src/live_blocking_tests.cpp +++ b/tests/src/live_blocking_tests.cpp @@ -563,6 +563,56 @@ TEST_F(LiveBlockingTests, TestConnectWhenGatewayNotUp) { ASSERT_THROW(builder_.BuildBlocking(), databento::TcpError); } +TEST_F(LiveBlockingTests, TestNextRecordThrowsOnGatewayClose) { + constexpr auto kTsOut = false; + constexpr OhlcvMsg kRec{DummyHeader(RType::Ohlcv1M), 1, 2, 3, 4, 5}; + + bool should_close{}; + std::mutex should_close_mutex; + std::condition_variable should_close_cv; + bool has_closed{}; + std::mutex has_closed_mutex; + std::condition_variable has_closed_cv; + const mock::MockLsgServer mock_server{ + dataset::kXnasItch, kTsOut, + [kRec, &should_close, &should_close_cv, &should_close_mutex, &has_closed, + &has_closed_cv, &has_closed_mutex](mock::MockLsgServer& self) { + self.Accept(); + self.Authenticate(); + self.SendRecord(kRec); + { + std::unique_lock lock{should_close_mutex}; + should_close_cv.wait(lock, [&should_close] { return should_close; }); + } + self.Close(); + { + const std::lock_guard lock{has_closed_mutex}; + has_closed = true; + has_closed_cv.notify_one(); + } + }}; + + LiveBlocking target = builder_.SetDataset(dataset::kXnasItch) + .SetSendTsOut(kTsOut) + .SetAddress(kLocalhost, mock_server.Port()) + .BuildBlocking(); + const auto rec = target.NextRecord(); + ASSERT_TRUE(rec.Holds()); + EXPECT_EQ(rec.Get(), kRec); + // Signal server to close connection + { + const std::lock_guard lock{should_close_mutex}; + should_close = true; + should_close_cv.notify_one(); + } + // Wait for server to close + { + std::unique_lock lock{has_closed_mutex}; + has_closed_cv.wait(lock, [&has_closed] { return has_closed; }); + } + ASSERT_THROW(target.NextRecord(), databento::DbnResponseError); +} + TEST_F(LiveBlockingTests, TestReconnectAndResubscribe) { constexpr auto kTsOut = false; constexpr TradeMsg kRec{DummyHeader(RType::Mbp0), From 478f2a20c78e75bd35878d4f7128cfafcda377cd Mon Sep 17 00:00:00 2001 From: Carter Green Date: Fri, 13 Feb 2026 16:55:22 -0600 Subject: [PATCH 3/7] ADD: Add slow reader behavior support to clients --- CHANGELOG.md | 5 +++ include/databento/enums.hpp | 11 +++++++ include/databento/live.hpp | 5 +++ include/databento/live_blocking.hpp | 10 ++++-- include/databento/live_threaded.hpp | 7 ++-- src/enums.cpp | 19 +++++++++++ src/live.cpp | 13 +++++--- src/live_blocking.cpp | 30 +++++++++++------- src/live_threaded.cpp | 44 +++++++++++++++----------- tests/include/mock/mock_lsg_server.hpp | 3 ++ tests/src/live_blocking_tests.cpp | 15 +++++++++ tests/src/mock_lsg_server.cpp | 20 +++++++++++- 12 files changed, 142 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d7ae7a6..9a72c60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ ## 0.47.1 - TBD +### Enhancements +- Added `SlowReadBehavior` enum and `LiveBuilder::SetSlowReadBehavior()` to configure + gateway behavior when client falls behind +- Added `SlowReadBehavior()` getter to `LiveBlocking` and `LiveThreaded` + ### Bug fixes - Added conversion for missing schemas for function `RTypeFromSchema` diff --git a/include/databento/enums.hpp b/include/databento/enums.hpp index 371bc69..ea045bf 100644 --- a/include/databento/enums.hpp +++ b/include/databento/enums.hpp @@ -48,6 +48,15 @@ enum class DatasetCondition : std::uint8_t { Missing, }; +// Live session parameter which controls gateway behavior when the client +// falls behind real time. +enum class SlowReadBehavior : std::uint8_t { + // Send a warning but continue reading. + Warn = 0, + // Skip records to catch up. + Skip = 1, +}; + // A record type sentinel. namespace r_type { enum RType : std::uint8_t { @@ -662,6 +671,7 @@ const char* ToString(SplitDuration duration_interval); const char* ToString(Delivery delivery); const char* ToString(JobState state); const char* ToString(DatasetCondition condition); +const char* ToString(SlowReadBehavior slow_read_behavior); const char* ToString(RType r_type); const char* ToString(Side side); const char* ToString(Action action); @@ -688,6 +698,7 @@ std::ostream& operator<<(std::ostream& out, SplitDuration duration_interval); std::ostream& operator<<(std::ostream& out, Delivery delivery); std::ostream& operator<<(std::ostream& out, JobState state); std::ostream& operator<<(std::ostream& out, DatasetCondition condition); +std::ostream& operator<<(std::ostream& out, SlowReadBehavior slow_read_behavior); std::ostream& operator<<(std::ostream& out, RType r_type); std::ostream& operator<<(std::ostream& out, Side side); std::ostream& operator<<(std::ostream& out, Action action); diff --git a/include/databento/live.hpp b/include/databento/live.hpp index bd66e68..acce846 100644 --- a/include/databento/live.hpp +++ b/include/databento/live.hpp @@ -2,6 +2,8 @@ #include #include +#include +#include #include #include "databento/enums.hpp" // VersionUpgradePolicy @@ -53,6 +55,8 @@ class LiveBuilder { LiveBuilder& ExtendUserAgent(std::string extension); // Sets the compression mode for the read stream. LiveBuilder& SetCompression(Compression compression); + // Sets the behavior of the gateway when the client falls behind real time. + LiveBuilder& SetSlowReadBehavior(SlowReadBehavior slow_read_behavior); /* * Build a live client instance @@ -80,5 +84,6 @@ class LiveBuilder { std::size_t buffer_size_; std::string user_agent_ext_; Compression compression_{Compression::None}; + std::optional slow_read_behavior_{}; }; } // namespace databento diff --git a/include/databento/live_blocking.hpp b/include/databento/live_blocking.hpp index 4f6d81d..e962d49 100644 --- a/include/databento/live_blocking.hpp +++ b/include/databento/live_blocking.hpp @@ -45,6 +45,9 @@ class LiveBlocking { return heartbeat_interval_; } databento::Compression Compression() const { return compression_; } + std::optional SlowReadBehavior() const { + return slow_read_behavior_; + } const std::vector& Subscriptions() const { return subscriptions_; } std::vector& Subscriptions() { return subscriptions_; } @@ -95,13 +98,15 @@ class LiveBlocking { bool send_ts_out, VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, - databento::Compression compression); + databento::Compression compression, + std::optional slow_read_behavior); LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset, std::string gateway, std::uint16_t port, bool send_ts_out, VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, - databento::Compression compression); + databento::Compression compression, + std::optional slow_read_behavior); std::string DetermineGateway() const; std::uint64_t Authenticate(); @@ -128,6 +133,7 @@ class LiveBlocking { const VersionUpgradePolicy upgrade_policy_; const std::optional heartbeat_interval_; const databento::Compression compression_; + const std::optional slow_read_behavior_; detail::LiveConnection connection_; std::uint32_t sub_counter_{}; std::vector subscriptions_; diff --git a/include/databento/live_threaded.hpp b/include/databento/live_threaded.hpp index 231790a..81dbf90 100644 --- a/include/databento/live_threaded.hpp +++ b/include/databento/live_threaded.hpp @@ -55,6 +55,7 @@ class LiveThreaded { VersionUpgradePolicy UpgradePolicy() const; std::optional HeartbeatInterval() const; databento::Compression Compression() const; + std::optional SlowReadBehavior() const; const std::vector& Subscriptions() const; std::vector& Subscriptions(); @@ -109,13 +110,15 @@ class LiveThreaded { bool send_ts_out, VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, - databento::Compression compression); + databento::Compression compression, + std::optional slow_read_behavior); LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset, std::string gateway, std::uint16_t port, bool send_ts_out, VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, - databento::Compression compression); + databento::Compression compression, + std::optional slow_read_behavior); // unique_ptr to be movable std::unique_ptr impl_; diff --git a/src/enums.cpp b/src/enums.cpp index 36623ed..fbaea2c 100644 --- a/src/enums.cpp +++ b/src/enums.cpp @@ -108,6 +108,20 @@ const char* ToString(DatasetCondition condition) { } } +const char* ToString(SlowReadBehavior slow_read_behavior) { + switch (slow_read_behavior) { + case SlowReadBehavior::Warn: { + return "warn"; + } + case SlowReadBehavior::Skip: { + return "skip"; + } + default: { + return "Unknown"; + } + } +} + const char* ToString(RType r_type) { switch (r_type) { case RType::Mbp0: { @@ -847,6 +861,11 @@ std::ostream& operator<<(std::ostream& out, DatasetCondition condition) { return out; } +std::ostream& operator<<(std::ostream& out, SlowReadBehavior slow_read_behavior) { + out << ToString(slow_read_behavior); + return out; +} + std::ostream& operator<<(std::ostream& out, RType r_type) { out << ToString(r_type); return out; diff --git a/src/live.cpp b/src/live.cpp index 26e8d4d..85140f3 100644 --- a/src/live.cpp +++ b/src/live.cpp @@ -84,6 +84,11 @@ LiveBuilder& LiveBuilder::SetCompression(Compression compression) { return *this; } +LiveBuilder& LiveBuilder::SetSlowReadBehavior(SlowReadBehavior slow_read_behavior) { + slow_read_behavior_ = slow_read_behavior; + return *this; +} + databento::LiveBlocking LiveBuilder::BuildBlocking() { Validate(); if (gateway_.empty()) { @@ -91,14 +96,14 @@ databento::LiveBlocking LiveBuilder::BuildBlocking() { dataset_, send_ts_out_, upgrade_policy_, heartbeat_interval_, buffer_size_, user_agent_ext_, - compression_}; + compression_, slow_read_behavior_}; } return databento::LiveBlocking{log_receiver_, key_, dataset_, gateway_, port_, send_ts_out_, upgrade_policy_, heartbeat_interval_, buffer_size_, user_agent_ext_, - compression_}; + compression_, slow_read_behavior_}; } databento::LiveThreaded LiveBuilder::BuildThreaded() { @@ -108,14 +113,14 @@ databento::LiveThreaded LiveBuilder::BuildThreaded() { dataset_, send_ts_out_, upgrade_policy_, heartbeat_interval_, buffer_size_, user_agent_ext_, - compression_}; + compression_, slow_read_behavior_}; } return databento::LiveThreaded{log_receiver_, key_, dataset_, gateway_, port_, send_ts_out_, upgrade_policy_, heartbeat_interval_, buffer_size_, user_agent_ext_, - compression_}; + compression_, slow_read_behavior_}; } void LiveBuilder::Validate() { diff --git a/src/live_blocking.cpp b/src/live_blocking.cpp index 451932f..4dec2eb 100644 --- a/src/live_blocking.cpp +++ b/src/live_blocking.cpp @@ -28,12 +28,12 @@ constexpr std::size_t kBucketIdLength = 5; databento::LiveBuilder LiveBlocking::Builder() { return databento::LiveBuilder{}; } -LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key, - std::string dataset, bool send_ts_out, - VersionUpgradePolicy upgrade_policy, - std::optional heartbeat_interval, - std::size_t buffer_size, std::string user_agent_ext, - databento::Compression compression) +LiveBlocking::LiveBlocking( + ILogReceiver* log_receiver, std::string key, std::string dataset, bool send_ts_out, + VersionUpgradePolicy upgrade_policy, + std::optional heartbeat_interval, std::size_t buffer_size, + std::string user_agent_ext, databento::Compression compression, + std::optional slow_read_behavior) : log_receiver_{log_receiver}, key_{std::move(key)}, @@ -45,16 +45,18 @@ LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key, upgrade_policy_{upgrade_policy}, heartbeat_interval_{heartbeat_interval}, compression_{compression}, + slow_read_behavior_{slow_read_behavior}, connection_{gateway_, port_}, buffer_{buffer_size}, session_id_{this->Authenticate()} {} -LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key, - std::string dataset, std::string gateway, std::uint16_t port, - bool send_ts_out, VersionUpgradePolicy upgrade_policy, - std::optional heartbeat_interval, - std::size_t buffer_size, std::string user_agent_ext, - databento::Compression compression) +LiveBlocking::LiveBlocking( + ILogReceiver* log_receiver, std::string key, std::string dataset, + std::string gateway, std::uint16_t port, bool send_ts_out, + VersionUpgradePolicy upgrade_policy, + std::optional heartbeat_interval, std::size_t buffer_size, + std::string user_agent_ext, databento::Compression compression, + std::optional slow_read_behavior) : log_receiver_{log_receiver}, key_{std::move(key)}, dataset_{std::move(dataset)}, @@ -65,6 +67,7 @@ LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key, upgrade_policy_{upgrade_policy}, heartbeat_interval_{heartbeat_interval}, compression_{compression}, + slow_read_behavior_{slow_read_behavior}, connection_{gateway_, port_}, buffer_{buffer_size}, session_id_{this->Authenticate()} {} @@ -340,6 +343,9 @@ std::string LiveBlocking::EncodeAuthReq(std::string_view auth) { if (heartbeat_interval_.has_value()) { req_stream << "|heartbeat_interval_s=" << heartbeat_interval_->count(); } + if (slow_read_behavior_.has_value()) { + req_stream << "|slow_read_behavior=" << *slow_read_behavior_; + } req_stream << '\n'; return req_stream.str(); } diff --git a/src/live_threaded.cpp b/src/live_threaded.cpp index 69d4681..4365300 100644 --- a/src/live_threaded.cpp +++ b/src/live_threaded.cpp @@ -57,26 +57,28 @@ LiveThreaded::~LiveThreaded() { } } -LiveThreaded::LiveThreaded(ILogReceiver* log_receiver, std::string key, - std::string dataset, bool send_ts_out, - VersionUpgradePolicy upgrade_policy, - std::optional heartbeat_interval, - std::size_t buffer_size, std::string user_agent_ext, - databento::Compression compression) - : impl_{std::make_unique( - log_receiver, std::move(key), std::move(dataset), send_ts_out, upgrade_policy, - heartbeat_interval, buffer_size, std::move(user_agent_ext), compression)} {} - -LiveThreaded::LiveThreaded(ILogReceiver* log_receiver, std::string key, - std::string dataset, std::string gateway, std::uint16_t port, - bool send_ts_out, VersionUpgradePolicy upgrade_policy, - std::optional heartbeat_interval, - std::size_t buffer_size, std::string user_agent_ext, - databento::Compression compression) +LiveThreaded::LiveThreaded( + ILogReceiver* log_receiver, std::string key, std::string dataset, bool send_ts_out, + VersionUpgradePolicy upgrade_policy, + std::optional heartbeat_interval, std::size_t buffer_size, + std::string user_agent_ext, databento::Compression compression, + std::optional slow_read_behavior) : impl_{std::make_unique(log_receiver, std::move(key), std::move(dataset), - std::move(gateway), port, send_ts_out, - upgrade_policy, heartbeat_interval, buffer_size, - std::move(user_agent_ext), compression)} {} + send_ts_out, upgrade_policy, heartbeat_interval, + buffer_size, std::move(user_agent_ext), compression, + slow_read_behavior)} {} + +LiveThreaded::LiveThreaded( + ILogReceiver* log_receiver, std::string key, std::string dataset, + std::string gateway, std::uint16_t port, bool send_ts_out, + VersionUpgradePolicy upgrade_policy, + std::optional heartbeat_interval, std::size_t buffer_size, + std::string user_agent_ext, databento::Compression compression, + std::optional slow_read_behavior) + : impl_{std::make_unique( + log_receiver, std::move(key), std::move(dataset), std::move(gateway), port, + send_ts_out, upgrade_policy, heartbeat_interval, buffer_size, + std::move(user_agent_ext), compression, slow_read_behavior)} {} const std::string& LiveThreaded::Key() const { return impl_->blocking.Key(); } @@ -100,6 +102,10 @@ databento::Compression LiveThreaded::Compression() const { return impl_->blocking.Compression(); } +std::optional LiveThreaded::SlowReadBehavior() const { + return impl_->blocking.SlowReadBehavior(); +} + const std::vector& LiveThreaded::Subscriptions() const { return impl_->blocking.Subscriptions(); } diff --git a/tests/include/mock/mock_lsg_server.hpp b/tests/include/mock/mock_lsg_server.hpp index 244babc..18ce392 100644 --- a/tests/include/mock/mock_lsg_server.hpp +++ b/tests/include/mock/mock_lsg_server.hpp @@ -49,6 +49,8 @@ class MockLsgServer { std::function serve_fn); MockLsgServer(std::string dataset, bool ts_out, Compression compression, std::function serve_fn); + MockLsgServer(std::string dataset, bool ts_out, SlowReadBehavior slow_read_behavior, + std::function serve_fn); std::uint16_t Port() const { return port_; } @@ -119,6 +121,7 @@ class MockLsgServer { bool ts_out_; std::chrono::seconds heartbeat_interval_; Compression compression_{Compression::None}; + std::optional slow_read_behavior_{}; std::uint16_t port_{}; detail::ScopedFd socket_{}; detail::ScopedFd conn_fd_{}; diff --git a/tests/src/live_blocking_tests.cpp b/tests/src/live_blocking_tests.cpp index 429f594..e28a57f 100644 --- a/tests/src/live_blocking_tests.cpp +++ b/tests/src/live_blocking_tests.cpp @@ -56,6 +56,21 @@ TEST_F(LiveBlockingTests, TestAuthentication) { .BuildBlocking(); } +TEST_F(LiveBlockingTests, TestAuthenticationWithSlowReadBehavior) { + constexpr auto kTsOut = false; + constexpr auto kSlowReadBehavior = SlowReadBehavior::Warn; + const mock::MockLsgServer mock_server{dataset::kXnasItch, kTsOut, kSlowReadBehavior, + [](mock::MockLsgServer& self) { + self.Accept(); + self.Authenticate(); + }}; + + const LiveBlocking target = builder_.SetDataset(dataset::kXnasItch) + .SetSlowReadBehavior(kSlowReadBehavior) + .SetAddress(kLocalhost, mock_server.Port()) + .BuildBlocking(); +} + TEST_F(LiveBlockingTests, TestStartAndUpgrade) { constexpr auto kTsOut = true; for (const auto [upgrade_policy, exp_version] : diff --git a/tests/src/mock_lsg_server.cpp b/tests/src/mock_lsg_server.cpp index 4ee08db..fc6a881 100644 --- a/tests/src/mock_lsg_server.cpp +++ b/tests/src/mock_lsg_server.cpp @@ -38,7 +38,8 @@ using databento::tests::mock::MockLsgServer; MockLsgServer::MockLsgServer(std::string dataset, bool ts_out, std::function serve_fn) - : MockLsgServer{std::move(dataset), ts_out, {}, std::move(serve_fn)} {} + : MockLsgServer{std::move(dataset), ts_out, std::chrono::seconds{}, + std::move(serve_fn)} {} MockLsgServer::MockLsgServer(std::string dataset, bool ts_out, std::chrono::seconds heartbeat_interval, @@ -58,6 +59,16 @@ MockLsgServer::MockLsgServer(std::string dataset, bool ts_out, Compression compr socket_{InitSocketAndSetPort()}, thread_{std::move(serve_fn), std::ref(*this)} {} +MockLsgServer::MockLsgServer(std::string dataset, bool ts_out, + SlowReadBehavior slow_read_behavior, + std::function serve_fn) + : dataset_{std::move(dataset)}, + ts_out_{ts_out}, + heartbeat_interval_{}, + slow_read_behavior_{slow_read_behavior}, + socket_{InitSocketAndSetPort()}, + thread_{std::move(serve_fn), std::ref(*this)} {} + void MockLsgServer::Accept() { sockaddr_in addr{}; auto addr_len = static_cast(sizeof(addr)); @@ -128,6 +139,13 @@ void MockLsgServer::Authenticate() { } else { EXPECT_EQ(received.find("heartbeat_interval_s="), std::string::npos); } + if (slow_read_behavior_.has_value()) { + EXPECT_NE(received.find("slow_read_behavior=" + + std::string{ToString(*slow_read_behavior_)}), + std::string::npos); + } else { + EXPECT_EQ(received.find("slow_read_behavior="), std::string::npos); + } Send("success=1|session_id=5|\n"); } From 2868ef8f17a551745a60fa5e61c04bf2b792fb2b Mon Sep 17 00:00:00 2001 From: Enrico Date: Tue, 17 Feb 2026 13:38:03 -0600 Subject: [PATCH 4/7] FIX: Fix json explicit optional construction --- CHANGELOG.md | 1 + src/detail/json_helpers.cpp | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a72c60..a0371ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ ### Bug fixes - Added conversion for missing schemas for function `RTypeFromSchema` +- Added explicit optional construction in `json_helpers.hpp` (credit: Enrico Detoma) ## 0.47.0 - 2026-02-04 diff --git a/src/detail/json_helpers.cpp b/src/detail/json_helpers.cpp index 7d9bf75..dbe40db 100644 --- a/src/detail/json_helpers.cpp +++ b/src/detail/json_helpers.cpp @@ -66,7 +66,7 @@ std::optional ParseAt(std::string_view endpoint, throw JsonResponseError::TypeMismatch(endpoint, std::string{key} + " string", val_json); } - return val_json; + return std::optional{val_json}; } template <> From a8b95ad75ee824375705a9b364507813e1222a77 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Tue, 17 Feb 2026 13:53:04 -0600 Subject: [PATCH 5/7] MOD: Update protocol for slow reader differences --- CHANGELOG.md | 4 ++-- include/databento/enums.hpp | 6 +++--- include/databento/live.hpp | 4 ++-- include/databento/live_blocking.hpp | 10 +++++----- include/databento/live_threaded.hpp | 6 +++--- src/enums.cpp | 14 +++++++------- src/live.cpp | 13 +++++++------ src/live_blocking.cpp | 12 ++++++------ src/live_threaded.cpp | 12 ++++++------ tests/include/mock/mock_lsg_server.hpp | 5 +++-- tests/src/live_blocking_tests.cpp | 8 ++++---- tests/src/mock_lsg_server.cpp | 12 ++++++------ 12 files changed, 54 insertions(+), 52 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a0371ee..03d16a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,9 @@ ## 0.47.1 - TBD ### Enhancements -- Added `SlowReadBehavior` enum and `LiveBuilder::SetSlowReadBehavior()` to configure +- Added `SlowReaderBehavior` enum and `LiveBuilder::SetSlowReaderBehavior()` to configure gateway behavior when client falls behind -- Added `SlowReadBehavior()` getter to `LiveBlocking` and `LiveThreaded` +- Added `SlowReaderBehavior()` getter to `LiveBlocking` and `LiveThreaded` ### Bug fixes - Added conversion for missing schemas for function `RTypeFromSchema` diff --git a/include/databento/enums.hpp b/include/databento/enums.hpp index ea045bf..e04ed9d 100644 --- a/include/databento/enums.hpp +++ b/include/databento/enums.hpp @@ -50,7 +50,7 @@ enum class DatasetCondition : std::uint8_t { // Live session parameter which controls gateway behavior when the client // falls behind real time. -enum class SlowReadBehavior : std::uint8_t { +enum class SlowReaderBehavior : std::uint8_t { // Send a warning but continue reading. Warn = 0, // Skip records to catch up. @@ -671,7 +671,7 @@ const char* ToString(SplitDuration duration_interval); const char* ToString(Delivery delivery); const char* ToString(JobState state); const char* ToString(DatasetCondition condition); -const char* ToString(SlowReadBehavior slow_read_behavior); +const char* ToString(SlowReaderBehavior slow_reader_behavior); const char* ToString(RType r_type); const char* ToString(Side side); const char* ToString(Action action); @@ -698,7 +698,7 @@ std::ostream& operator<<(std::ostream& out, SplitDuration duration_interval); std::ostream& operator<<(std::ostream& out, Delivery delivery); std::ostream& operator<<(std::ostream& out, JobState state); std::ostream& operator<<(std::ostream& out, DatasetCondition condition); -std::ostream& operator<<(std::ostream& out, SlowReadBehavior slow_read_behavior); +std::ostream& operator<<(std::ostream& out, SlowReaderBehavior slow_reader_behavior); std::ostream& operator<<(std::ostream& out, RType r_type); std::ostream& operator<<(std::ostream& out, Side side); std::ostream& operator<<(std::ostream& out, Action action); diff --git a/include/databento/live.hpp b/include/databento/live.hpp index acce846..d9f31ed 100644 --- a/include/databento/live.hpp +++ b/include/databento/live.hpp @@ -56,7 +56,7 @@ class LiveBuilder { // Sets the compression mode for the read stream. LiveBuilder& SetCompression(Compression compression); // Sets the behavior of the gateway when the client falls behind real time. - LiveBuilder& SetSlowReadBehavior(SlowReadBehavior slow_read_behavior); + LiveBuilder& SetSlowReaderBehavior(SlowReaderBehavior slow_reader_behavior); /* * Build a live client instance @@ -84,6 +84,6 @@ class LiveBuilder { std::size_t buffer_size_; std::string user_agent_ext_; Compression compression_{Compression::None}; - std::optional slow_read_behavior_{}; + std::optional slow_reader_behavior_{}; }; } // namespace databento diff --git a/include/databento/live_blocking.hpp b/include/databento/live_blocking.hpp index e962d49..f6ef7ea 100644 --- a/include/databento/live_blocking.hpp +++ b/include/databento/live_blocking.hpp @@ -45,8 +45,8 @@ class LiveBlocking { return heartbeat_interval_; } databento::Compression Compression() const { return compression_; } - std::optional SlowReadBehavior() const { - return slow_read_behavior_; + std::optional SlowReaderBehavior() const { + return slow_reader_behavior_; } const std::vector& Subscriptions() const { return subscriptions_; } std::vector& Subscriptions() { return subscriptions_; } @@ -99,14 +99,14 @@ class LiveBlocking { std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, databento::Compression compression, - std::optional slow_read_behavior); + std::optional slow_reader_behavior); LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset, std::string gateway, std::uint16_t port, bool send_ts_out, VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, databento::Compression compression, - std::optional slow_read_behavior); + std::optional slow_reader_behavior); std::string DetermineGateway() const; std::uint64_t Authenticate(); @@ -133,7 +133,7 @@ class LiveBlocking { const VersionUpgradePolicy upgrade_policy_; const std::optional heartbeat_interval_; const databento::Compression compression_; - const std::optional slow_read_behavior_; + const std::optional slow_reader_behavior_; detail::LiveConnection connection_; std::uint32_t sub_counter_{}; std::vector subscriptions_; diff --git a/include/databento/live_threaded.hpp b/include/databento/live_threaded.hpp index 81dbf90..e427597 100644 --- a/include/databento/live_threaded.hpp +++ b/include/databento/live_threaded.hpp @@ -55,7 +55,7 @@ class LiveThreaded { VersionUpgradePolicy UpgradePolicy() const; std::optional HeartbeatInterval() const; databento::Compression Compression() const; - std::optional SlowReadBehavior() const; + std::optional SlowReaderBehavior() const; const std::vector& Subscriptions() const; std::vector& Subscriptions(); @@ -111,14 +111,14 @@ class LiveThreaded { std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, databento::Compression compression, - std::optional slow_read_behavior); + std::optional slow_reader_behavior); LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset, std::string gateway, std::uint16_t port, bool send_ts_out, VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, databento::Compression compression, - std::optional slow_read_behavior); + std::optional slow_reader_behavior); // unique_ptr to be movable std::unique_ptr impl_; diff --git a/src/enums.cpp b/src/enums.cpp index fbaea2c..c05f00c 100644 --- a/src/enums.cpp +++ b/src/enums.cpp @@ -108,13 +108,13 @@ const char* ToString(DatasetCondition condition) { } } -const char* ToString(SlowReadBehavior slow_read_behavior) { - switch (slow_read_behavior) { - case SlowReadBehavior::Warn: { +const char* ToString(SlowReaderBehavior slow_reader_behavior) { + switch (slow_reader_behavior) { + case SlowReaderBehavior::Warn: { return "warn"; } - case SlowReadBehavior::Skip: { - return "skip"; + case SlowReaderBehavior::Skip: { + return "drop"; } default: { return "Unknown"; @@ -861,8 +861,8 @@ std::ostream& operator<<(std::ostream& out, DatasetCondition condition) { return out; } -std::ostream& operator<<(std::ostream& out, SlowReadBehavior slow_read_behavior) { - out << ToString(slow_read_behavior); +std::ostream& operator<<(std::ostream& out, SlowReaderBehavior slow_reader_behavior) { + out << ToString(slow_reader_behavior); return out; } diff --git a/src/live.cpp b/src/live.cpp index 85140f3..cfd8515 100644 --- a/src/live.cpp +++ b/src/live.cpp @@ -84,8 +84,9 @@ LiveBuilder& LiveBuilder::SetCompression(Compression compression) { return *this; } -LiveBuilder& LiveBuilder::SetSlowReadBehavior(SlowReadBehavior slow_read_behavior) { - slow_read_behavior_ = slow_read_behavior; +LiveBuilder& LiveBuilder::SetSlowReaderBehavior( + SlowReaderBehavior slow_reader_behavior) { + slow_reader_behavior_ = slow_reader_behavior; return *this; } @@ -96,14 +97,14 @@ databento::LiveBlocking LiveBuilder::BuildBlocking() { dataset_, send_ts_out_, upgrade_policy_, heartbeat_interval_, buffer_size_, user_agent_ext_, - compression_, slow_read_behavior_}; + compression_, slow_reader_behavior_}; } return databento::LiveBlocking{log_receiver_, key_, dataset_, gateway_, port_, send_ts_out_, upgrade_policy_, heartbeat_interval_, buffer_size_, user_agent_ext_, - compression_, slow_read_behavior_}; + compression_, slow_reader_behavior_}; } databento::LiveThreaded LiveBuilder::BuildThreaded() { @@ -113,14 +114,14 @@ databento::LiveThreaded LiveBuilder::BuildThreaded() { dataset_, send_ts_out_, upgrade_policy_, heartbeat_interval_, buffer_size_, user_agent_ext_, - compression_, slow_read_behavior_}; + compression_, slow_reader_behavior_}; } return databento::LiveThreaded{log_receiver_, key_, dataset_, gateway_, port_, send_ts_out_, upgrade_policy_, heartbeat_interval_, buffer_size_, user_agent_ext_, - compression_, slow_read_behavior_}; + compression_, slow_reader_behavior_}; } void LiveBuilder::Validate() { diff --git a/src/live_blocking.cpp b/src/live_blocking.cpp index 4dec2eb..3e4b9e1 100644 --- a/src/live_blocking.cpp +++ b/src/live_blocking.cpp @@ -33,7 +33,7 @@ LiveBlocking::LiveBlocking( VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, databento::Compression compression, - std::optional slow_read_behavior) + std::optional slow_reader_behavior) : log_receiver_{log_receiver}, key_{std::move(key)}, @@ -45,7 +45,7 @@ LiveBlocking::LiveBlocking( upgrade_policy_{upgrade_policy}, heartbeat_interval_{heartbeat_interval}, compression_{compression}, - slow_read_behavior_{slow_read_behavior}, + slow_reader_behavior_{slow_reader_behavior}, connection_{gateway_, port_}, buffer_{buffer_size}, session_id_{this->Authenticate()} {} @@ -56,7 +56,7 @@ LiveBlocking::LiveBlocking( VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, databento::Compression compression, - std::optional slow_read_behavior) + std::optional slow_reader_behavior) : log_receiver_{log_receiver}, key_{std::move(key)}, dataset_{std::move(dataset)}, @@ -67,7 +67,7 @@ LiveBlocking::LiveBlocking( upgrade_policy_{upgrade_policy}, heartbeat_interval_{heartbeat_interval}, compression_{compression}, - slow_read_behavior_{slow_read_behavior}, + slow_reader_behavior_{slow_reader_behavior}, connection_{gateway_, port_}, buffer_{buffer_size}, session_id_{this->Authenticate()} {} @@ -343,8 +343,8 @@ std::string LiveBlocking::EncodeAuthReq(std::string_view auth) { if (heartbeat_interval_.has_value()) { req_stream << "|heartbeat_interval_s=" << heartbeat_interval_->count(); } - if (slow_read_behavior_.has_value()) { - req_stream << "|slow_read_behavior=" << *slow_read_behavior_; + if (slow_reader_behavior_.has_value()) { + req_stream << "|slow_reader_behavior=" << *slow_reader_behavior_; } req_stream << '\n'; return req_stream.str(); diff --git a/src/live_threaded.cpp b/src/live_threaded.cpp index 4365300..0ec15d0 100644 --- a/src/live_threaded.cpp +++ b/src/live_threaded.cpp @@ -62,11 +62,11 @@ LiveThreaded::LiveThreaded( VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, databento::Compression compression, - std::optional slow_read_behavior) + std::optional slow_reader_behavior) : impl_{std::make_unique(log_receiver, std::move(key), std::move(dataset), send_ts_out, upgrade_policy, heartbeat_interval, buffer_size, std::move(user_agent_ext), compression, - slow_read_behavior)} {} + slow_reader_behavior)} {} LiveThreaded::LiveThreaded( ILogReceiver* log_receiver, std::string key, std::string dataset, @@ -74,11 +74,11 @@ LiveThreaded::LiveThreaded( VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, databento::Compression compression, - std::optional slow_read_behavior) + std::optional slow_reader_behavior) : impl_{std::make_unique( log_receiver, std::move(key), std::move(dataset), std::move(gateway), port, send_ts_out, upgrade_policy, heartbeat_interval, buffer_size, - std::move(user_agent_ext), compression, slow_read_behavior)} {} + std::move(user_agent_ext), compression, slow_reader_behavior)} {} const std::string& LiveThreaded::Key() const { return impl_->blocking.Key(); } @@ -102,8 +102,8 @@ databento::Compression LiveThreaded::Compression() const { return impl_->blocking.Compression(); } -std::optional LiveThreaded::SlowReadBehavior() const { - return impl_->blocking.SlowReadBehavior(); +std::optional LiveThreaded::SlowReaderBehavior() const { + return impl_->blocking.SlowReaderBehavior(); } const std::vector& LiveThreaded::Subscriptions() const { diff --git a/tests/include/mock/mock_lsg_server.hpp b/tests/include/mock/mock_lsg_server.hpp index 18ce392..66cec4a 100644 --- a/tests/include/mock/mock_lsg_server.hpp +++ b/tests/include/mock/mock_lsg_server.hpp @@ -49,7 +49,8 @@ class MockLsgServer { std::function serve_fn); MockLsgServer(std::string dataset, bool ts_out, Compression compression, std::function serve_fn); - MockLsgServer(std::string dataset, bool ts_out, SlowReadBehavior slow_read_behavior, + MockLsgServer(std::string dataset, bool ts_out, + SlowReaderBehavior slow_reader_behavior, std::function serve_fn); std::uint16_t Port() const { return port_; } @@ -121,7 +122,7 @@ class MockLsgServer { bool ts_out_; std::chrono::seconds heartbeat_interval_; Compression compression_{Compression::None}; - std::optional slow_read_behavior_{}; + std::optional slow_reader_behavior_{}; std::uint16_t port_{}; detail::ScopedFd socket_{}; detail::ScopedFd conn_fd_{}; diff --git a/tests/src/live_blocking_tests.cpp b/tests/src/live_blocking_tests.cpp index e28a57f..5f1cf2b 100644 --- a/tests/src/live_blocking_tests.cpp +++ b/tests/src/live_blocking_tests.cpp @@ -56,17 +56,17 @@ TEST_F(LiveBlockingTests, TestAuthentication) { .BuildBlocking(); } -TEST_F(LiveBlockingTests, TestAuthenticationWithSlowReadBehavior) { +TEST_F(LiveBlockingTests, TestAuthenticationWithSlowReaderBehavior) { constexpr auto kTsOut = false; - constexpr auto kSlowReadBehavior = SlowReadBehavior::Warn; - const mock::MockLsgServer mock_server{dataset::kXnasItch, kTsOut, kSlowReadBehavior, + constexpr auto kSlowReaderBehavior = SlowReaderBehavior::Warn; + const mock::MockLsgServer mock_server{dataset::kXnasItch, kTsOut, kSlowReaderBehavior, [](mock::MockLsgServer& self) { self.Accept(); self.Authenticate(); }}; const LiveBlocking target = builder_.SetDataset(dataset::kXnasItch) - .SetSlowReadBehavior(kSlowReadBehavior) + .SetSlowReaderBehavior(kSlowReaderBehavior) .SetAddress(kLocalhost, mock_server.Port()) .BuildBlocking(); } diff --git a/tests/src/mock_lsg_server.cpp b/tests/src/mock_lsg_server.cpp index fc6a881..ea73ebe 100644 --- a/tests/src/mock_lsg_server.cpp +++ b/tests/src/mock_lsg_server.cpp @@ -60,12 +60,12 @@ MockLsgServer::MockLsgServer(std::string dataset, bool ts_out, Compression compr thread_{std::move(serve_fn), std::ref(*this)} {} MockLsgServer::MockLsgServer(std::string dataset, bool ts_out, - SlowReadBehavior slow_read_behavior, + SlowReaderBehavior slow_reader_behavior, std::function serve_fn) : dataset_{std::move(dataset)}, ts_out_{ts_out}, heartbeat_interval_{}, - slow_read_behavior_{slow_read_behavior}, + slow_reader_behavior_{slow_reader_behavior}, socket_{InitSocketAndSetPort()}, thread_{std::move(serve_fn), std::ref(*this)} {} @@ -139,12 +139,12 @@ void MockLsgServer::Authenticate() { } else { EXPECT_EQ(received.find("heartbeat_interval_s="), std::string::npos); } - if (slow_read_behavior_.has_value()) { - EXPECT_NE(received.find("slow_read_behavior=" + - std::string{ToString(*slow_read_behavior_)}), + if (slow_reader_behavior_.has_value()) { + EXPECT_NE(received.find("slow_reader_behavior=" + + std::string{ToString(*slow_reader_behavior_)}), std::string::npos); } else { - EXPECT_EQ(received.find("slow_read_behavior="), std::string::npos); + EXPECT_EQ(received.find("slow_reader_behavior="), std::string::npos); } Send("success=1|session_id=5|\n"); } From c197fabf5d0d8c2d80afe63ec8182fc33cc0b1d7 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Wed, 18 Feb 2026 08:49:16 -0600 Subject: [PATCH 6/7] DEL: Remove file copies from bad sync --- live.hpp | 78 ------------------------ live_blocking.hpp | 138 ------------------------------------------ live_subscription.hpp | 23 ------- live_threaded.hpp | 123 ------------------------------------- 4 files changed, 362 deletions(-) delete mode 100644 live.hpp delete mode 100644 live_blocking.hpp delete mode 100644 live_subscription.hpp delete mode 100644 live_threaded.hpp diff --git a/live.hpp b/live.hpp deleted file mode 100644 index 9496fd5..0000000 --- a/live.hpp +++ /dev/null @@ -1,78 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "databento/enums.hpp" // VersionUpgradePolicy -#include "databento/live_blocking.hpp" -#include "databento/live_threaded.hpp" -#include "databento/publishers.hpp" - -namespace databento { -// Forward declarations -class ILogReceiver; - -// A helper class for constructing a Live client, either an instance of -// LiveBlocking or LiveThreaded. -class LiveBuilder { - public: - LiveBuilder(); - - /* - * Required settters - */ - - // Sets `key_` based on the environment variable DATABENTO_API_KEY. - // - // NOTE: This is not thread-safe if `std::setenv` is used elsewhere in the - // program. - LiveBuilder& SetKeyFromEnv(); - LiveBuilder& SetKey(std::string key); - LiveBuilder& SetDataset(Dataset dataset); - LiveBuilder& SetDataset(std::string dataset); - - /* - * Optional settters - */ - - // Whether to append the gateway send timestamp after each DBN message. - LiveBuilder& SetSendTsOut(bool send_ts_out); - // Set the version upgrade policy for when receiving DBN data from a prior - // version. Defaults to upgrading to DBNv2 (if not already). - LiveBuilder& SetUpgradePolicy(VersionUpgradePolicy upgrade_policy); - // Sets the receiver of the logs to be used by the client. - LiveBuilder& SetLogReceiver(ILogReceiver* log_receiver); - // Overrides the heartbeat interval. - LiveBuilder& SetHeartbeatInterval(std::chrono::seconds heartbeat_interval); - // Overrides the gateway and port. This is an advanced method. - LiveBuilder& SetAddress(std::string gateway, std::uint16_t port); - // Overrides the size of the buffer used for reading data from the TCP socket. - LiveBuilder& SetBufferSize(std::size_t size); - - /* - * Build a live client instance - */ - - // Attempts to construct an instance of a blocking live client or throws an - // exception. - LiveBlocking BuildBlocking(); - // Attempts to construct an instance of a threaded live client or throws an - // exception. - LiveThreaded BuildThreaded(); - - private: - void Validate(); - - ILogReceiver* log_receiver_{}; - std::string gateway_{}; - std::uint16_t port_{}; - std::string key_; - std::string dataset_; - - bool send_ts_out_{false}; - VersionUpgradePolicy upgrade_policy_{VersionUpgradePolicy::UpgradeToV3}; - std::optional heartbeat_interval_{}; - std::size_t buffer_size_; -}; -} // namespace databento diff --git a/live_blocking.hpp b/live_blocking.hpp deleted file mode 100644 index 2ec5517..0000000 --- a/live_blocking.hpp +++ /dev/null @@ -1,138 +0,0 @@ -#pragma once - -#include -#include // milliseconds -#include -#include -#include -#include -#include -#include // pair -#include - -#include "databento/datetime.hpp" // UnixNanos -#include "databento/dbn.hpp" // Metadata -#include "databento/detail/buffer.hpp" -#include "databento/detail/tcp_client.hpp" // TcpClient -#include "databento/enums.hpp" // Schema, SType, VersionUpgradePolicy -#include "databento/live_subscription.hpp" -#include "databento/record.hpp" // Record, RecordHeader - -namespace databento { -// Forward declaration -class ILogReceiver; -class LiveBuilder; -class LiveThreaded; - -// A client for interfacing with Databento's real-time and intraday replay -// market data API. This client provides a blocking API for getting the next -// record. Unlike Historical, each instance of LiveBlocking is associated with a -// particular dataset. -class LiveBlocking { - public: - /* - * Getters - */ - - const std::string& Key() const { return key_; } - const std::string& Dataset() const { return dataset_; } - const std::string& Gateway() const { return gateway_; } - std::uint16_t Port() const { return port_; } - bool SendTsOut() const { return send_ts_out_; } - VersionUpgradePolicy UpgradePolicy() const { return upgrade_policy_; } - // The the first member of the pair will be true, when the heartbeat interval - // was overridden. - std::optional HeartbeatInterval() const { - return heartbeat_interval_; - } - const std::vector& Subscriptions() const { - return subscriptions_; - } - std::vector& Subscriptions() { return subscriptions_; } - - /* - * Methods - */ - - // Add a new subscription. A single client instance supports multiple - // subscriptions. Note there is no unsubscribe method. Subscriptions end - // when the client disconnects in its destructor. - void Subscribe(const std::vector& symbols, Schema schema, - SType stype_in); - void Subscribe(const std::vector& symbols, Schema schema, - SType stype_in, UnixNanos start); - void Subscribe(const std::vector& symbols, Schema schema, - SType stype_in, const std::string& start); - void SubscribeWithSnapshot(const std::vector& symbols, - Schema schema, SType stype_in); - // Notifies the gateway to start sending messages for all subscriptions. - // - // This method should only be called once per instance. - Metadata Start(); - // Block on getting the next record. The returned reference is valid until - // this method is called again. - // - // This method should only be called after `Start`. - const Record& NextRecord(); - // Block on getting the next record. The returned pointer is valid until - // this method is called again. Will return `nullptr` if the `timeout` is - // reached. - // - // This method should only be called after `Start`. - const Record* NextRecord(std::chrono::milliseconds timeout); - // Stops the session with the gateway. Once stopped, the session cannot be - // restarted. - void Stop(); - // Closes the current connection and attempts to reconnect to the gateway. - void Reconnect(); - // Resubscribes to all subscriptions, removing the original `start` time, if - // any. Usually performed after a `Reconnect()`. - void Resubscribe(); - - private: - friend LiveBuilder; - friend LiveThreaded; - - LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset, - bool send_ts_out, VersionUpgradePolicy upgrade_policy, - std::optional heartbeat_interval, - std::size_t buffer_size); - LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset, - std::string gateway, std::uint16_t port, bool send_ts_out, - VersionUpgradePolicy upgrade_policy, - std::optional heartbeat_interval, - std::size_t buffer_size); - - std::string DetermineGateway() const; - std::uint64_t Authenticate(); - std::string DecodeChallenge(); - std::string GenerateCramReply(std::string_view challenge_key); - std::string EncodeAuthReq(std::string_view auth); - std::uint64_t DecodeAuthResp(); - void IncrementSubCounter(); - void Subscribe(std::string_view sub_msg, - const std::vector& symbols, bool use_snapshot); - detail::TcpClient::Result FillBuffer(std::chrono::milliseconds timeout); - RecordHeader* BufferRecordHeader(); - - static constexpr std::size_t kMaxStrLen = 24L * 1024; - - ILogReceiver* log_receiver_; - std::string key_; - std::string dataset_; - std::string gateway_; - std::uint16_t port_; - bool send_ts_out_; - std::uint8_t version_{}; - VersionUpgradePolicy upgrade_policy_; - std::optional heartbeat_interval_; - detail::TcpClient client_; - std::uint32_t sub_counter_{}; - std::vector subscriptions_; - detail::Buffer buffer_; - // Must be 8-byte aligned for records - alignas(RecordHeader) std::array compat_buffer_{}; - std::uint64_t session_id_; - Record current_record_{nullptr}; -}; -} // namespace databento diff --git a/live_subscription.hpp b/live_subscription.hpp deleted file mode 100644 index 50d7313..0000000 --- a/live_subscription.hpp +++ /dev/null @@ -1,23 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -#include "databento/datetime.hpp" // UnixNanos -#include "databento/enums.hpp" // Schema, SType - -namespace databento { -struct LiveSubscription { - struct Snapshot {}; - struct NoStart {}; - using Start = std::variant; - - std::vector symbols; - Schema schema; - SType stype_in; - Start start; - std::uint32_t id{}; -}; -} // namespace databento diff --git a/live_threaded.hpp b/live_threaded.hpp deleted file mode 100644 index 05e9bba..0000000 --- a/live_threaded.hpp +++ /dev/null @@ -1,123 +0,0 @@ -#pragma once - -#include -#include -#include // function -#include // unique_ptr -#include -#include -#include -#include // pair -#include - -#include "databento/datetime.hpp" // UnixNanos -#include "databento/detail/scoped_thread.hpp" // ScopedThread -#include "databento/enums.hpp" // Schema, SType -#include "databento/live_subscription.hpp" -#include "databento/timeseries.hpp" // MetadataCallback, RecordCallback - -namespace databento { -// Forward declaration -class ILogReceiver; -class LiveBuilder; - -// A client for interfacing with Databento's real-time and intraday replay -// market data API. This client provides a threaded event-driven API for -// receiving the next record. Unlike Historical, each instance of LiveThreaded -// is associated with a particular dataset. -class LiveThreaded { - public: - enum class ExceptionAction : std::uint8_t { - // Start a new session. Return this instead of calling `Start`, which would - // cause a deadlock. - Restart, - // Close the connection and stop the callback thread. - Stop, - }; - using ExceptionCallback = - std::function; - - LiveThreaded(const LiveThreaded&) = delete; - LiveThreaded& operator=(const LiveThreaded&) = delete; - LiveThreaded(LiveThreaded&& other) noexcept; - LiveThreaded& operator=(LiveThreaded&& rhs) noexcept; - ~LiveThreaded(); - - /* - * Getters - */ - - const std::string& Key() const; - const std::string& Dataset() const; - const std::string& Gateway() const; - std::uint16_t Port() const; - bool SendTsOut() const; - VersionUpgradePolicy UpgradePolicy() const; - // The the first member of the pair will be true, when the heartbeat interval - // was overridden. - std::optional HeartbeatInterval() const; - const std::vector& Subscriptions() const; - std::vector& Subscriptions(); - - /* - * Methods - */ - - // Add a new subscription. A single client instance supports multiple - // subscriptions. Note there is no unsubscribe method. Subscriptions end - // when the client disconnects when it's destroyed. - void Subscribe(const std::vector& symbols, Schema schema, - SType stype_in); - void Subscribe(const std::vector& symbols, Schema schema, - SType stype_in, UnixNanos start); - void Subscribe(const std::vector& symbols, Schema schema, - SType stype_in, const std::string& start); - void SubscribeWithSnapshot(const std::vector& symbols, - Schema schema, SType stype_in); - // Notifies the gateway to start sending messages for all subscriptions. - // `metadata_callback` will be called exactly once, before any calls to - // `record_callback`. `record_callback` will be called for records from all - // subscriptions. - // - // This method should only be called once per instance. - void Start(RecordCallback record_callback); - void Start(MetadataCallback metadata_callback, - RecordCallback record_callback); - void Start(MetadataCallback metadata_callback, RecordCallback record_callback, - ExceptionCallback exception_callback); - // Closes the current connection, and attempts to reconnect to the gateway. - void Reconnect(); - void Resubscribe(); - // Blocking wait with an optional timeout for the session to close when the - // record_callback or the exception_callback return Stop. - void BlockForStop(); - KeepGoing BlockForStop(std::chrono::milliseconds timeout); - - private: - friend LiveBuilder; - - struct Impl; - - static void ProcessingThread(Impl* impl, MetadataCallback&& metadata_callback, - RecordCallback&& record_callback, - ExceptionCallback&& exception_callback); - static ExceptionAction ExceptionHandler( - Impl* impl, const ExceptionCallback& exception_callback, - const std::exception& exc, std::string_view pretty_function_name, - std::string_view message); - - LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset, - bool send_ts_out, VersionUpgradePolicy upgrade_policy, - std::optional heartbeat_interval, - std::size_t buffer_size); - LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset, - std::string gateway, std::uint16_t port, bool send_ts_out, - VersionUpgradePolicy upgrade_policy, - std::optional heartbeat_interval, - std::size_t buffer_size); - - // unique_ptr to be movable - std::unique_ptr impl_; - detail::ScopedThread thread_; -}; -} // namespace databento From 930229529c97d281b59e71ac77f0ebf9998bdaa0 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Tue, 17 Feb 2026 12:32:41 -0600 Subject: [PATCH 7/7] VER: Release 0.48.0 --- CHANGELOG.md | 4 ++-- CMakeLists.txt | 2 +- pkg/PKGBUILD | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 03d16a5..8f0ef69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 0.47.1 - TBD +## 0.48.0 - 2026-02-18 ### Enhancements - Added `SlowReaderBehavior` enum and `LiveBuilder::SetSlowReaderBehavior()` to configure @@ -15,7 +15,7 @@ ### Enhancements - Added Zstd compression support to live clients which can be enabled with -`LiveBuilder::SetCompression()`. It's disabled by default + `LiveBuilder::SetCompression()`. It's disabled by default - Added `Compression()` getter to `LiveBlocking` and `LiveThreaded` - Upgraded default `httplib` version to 0.30.1 diff --git a/CMakeLists.txt b/CMakeLists.txt index f8035a4..86bb584 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ cmake_minimum_required(VERSION 3.24..4.2) project( databento - VERSION 0.47.0 + VERSION 0.48.0 LANGUAGES CXX DESCRIPTION "Official Databento client library" ) diff --git a/pkg/PKGBUILD b/pkg/PKGBUILD index 6a49a58..00b5057 100644 --- a/pkg/PKGBUILD +++ b/pkg/PKGBUILD @@ -1,7 +1,7 @@ # Maintainer: Databento _pkgname=databento-cpp pkgname=databento-cpp-git -pkgver=0.47.0 +pkgver=0.48.0 pkgrel=1 pkgdesc="Official C++ client for Databento" arch=('any')