diff --git a/doc/09-object-types.md b/doc/09-object-types.md index ca45194ac59..9833d159d49 100644 --- a/doc/09-object-types.md +++ b/doc/09-object-types.md @@ -1218,7 +1218,6 @@ Configuration Attributes: port | Number | **Required.** Elasticsearch port. Defaults to `9200`. index | String | **Required.** Prefix for the index names. Defaults to `icinga2`. enable\_send\_perfdata | Boolean | **Optional.** Send parsed performance data metrics for check results. Defaults to `false`. - diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to Elasticsearch before disconnecting. Defaults to `10s`. flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to Elasticsearch. Defaults to `10s`. flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to Elasticsearch. Defaults to `1024`. username | String | **Optional.** Basic auth username if Elasticsearch is hidden behind an HTTP proxy. @@ -1314,7 +1313,6 @@ Configuration Attributes: --------------------------|-----------------------|---------------------------------- host | String | **Optional.** GELF receiver host address. Defaults to `127.0.0.1`. port | Number | **Optional.** GELF receiver port. Defaults to `12201`. - diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to GELF before disconnecting. Defaults to `10s`. source | String | **Optional.** Source name for this instance. Defaults to `icinga2`. enable\_send\_perfdata | Boolean | **Optional.** Enable performance data for 'CHECK RESULT' events. enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`. @@ -1345,7 +1343,6 @@ Configuration Attributes: --------------------------|-----------------------|---------------------------------- host | String | **Optional.** Graphite Carbon host address. Defaults to `127.0.0.1`. port | Number | **Optional.** Graphite Carbon port. Defaults to `2003`. - diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to Graphite before disconnecting. Defaults to `10s`. host\_name\_template | String | **Optional.** Metric prefix for host name. Defaults to `icinga2.$host.name$.host.$host.check_command$`. service\_name\_template | String | **Optional.** Metric prefix for service name. Defaults to `icinga2.$host.name$.services.$service.name$.$service.check_command$`. enable\_send\_thresholds | Boolean | **Optional.** Send additional threshold metrics. Defaults to `false`. @@ -1686,7 +1683,6 @@ Configuration Attributes: service\_template | Dictionary | **Required.** Service template to define the influxDB line protocol. enable\_send\_thresholds | Boolean | **Optional.** Whether to send warn, crit, min & max tagged data. enable\_send\_metadata | Boolean | **Optional.** Whether to send check metadata e.g. states, execution time, latency etc. - diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to InfluxDB before disconnecting. Defaults to `10s`. flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to InfluxDB. Defaults to `10s`. flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`. enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`. @@ -1750,7 +1746,6 @@ Configuration Attributes: service\_template | Dictionary | **Required.** Service template to define the influxDB line protocol. enable\_send\_thresholds | Boolean | **Optional.** Whether to send warn, crit, min & max tagged data. enable\_send\_metadata | Boolean | **Optional.** Whether to send check metadata e.g. states, execution time, latency etc. - diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to InfluxDB before disconnecting. Defaults to `10s`. flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to InfluxDB. Defaults to `10s`. flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`. enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`. @@ -1865,7 +1860,6 @@ Configuration Attributes: --------------------------|-----------------------|---------------------------------- host | String | **Optional.** OpenTSDB host address. Defaults to `127.0.0.1`. port | Number | **Optional.** OpenTSDB port. Defaults to `4242`. - diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to OpenTSDB before disconnecting. Defaults to `10s`. enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`. enable_generic_metrics | Boolean | **Optional.** Re-use metric names to store different perfdata values for a particular check. Use tags to distinguish perfdata instead of metric name. Defaults to `false`. host_template | Dictionary | **Optional.** Specify additional tags to be included with host metrics. This requires a sub-dictionary named `tags`. Also specify a naming prefix by setting `metric`. More information can be found in [OpenTSDB custom tags](14-features.md#opentsdb-custom-tags) and [OpenTSDB Metric Prefix](14-features.md#opentsdb-metric-prefix). More information can be found in [OpenTSDB custom tags](14-features.md#opentsdb-custom-tags). Defaults to an `empty Dictionary`. diff --git a/lib/perfdata/CMakeLists.txt b/lib/perfdata/CMakeLists.txt index c867911be4a..5ce1635ddcc 100644 --- a/lib/perfdata/CMakeLists.txt +++ b/lib/perfdata/CMakeLists.txt @@ -19,7 +19,6 @@ set(perfdata_SOURCES influxdb2writer.cpp influxdb2writer.hpp influxdb2writer-ti.hpp opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp - perfdatawriterconnection.cpp perfdatawriterconnection.hpp ) if(ICINGA2_WITH_OPENTELEMETRY) diff --git a/lib/perfdata/elasticsearchwriter.cpp b/lib/perfdata/elasticsearchwriter.cpp index 9f5108ddd8d..5d20cb1d65d 100644 --- a/lib/perfdata/elasticsearchwriter.cpp +++ b/lib/perfdata/elasticsearchwriter.cpp @@ -2,7 +2,6 @@ // SPDX-License-Identifier: GPL-2.0-or-later #include "perfdata/elasticsearchwriter.hpp" -#include "base/defer.hpp" #include "perfdata/elasticsearchwriter-ti.cpp" #include "remote/url.hpp" #include "icinga/compatutility.hpp" @@ -10,14 +9,30 @@ #include "icinga/macroprocessor.hpp" #include "icinga/checkcommand.hpp" #include "base/application.hpp" +#include "base/defer.hpp" +#include "base/io-engine.hpp" +#include "base/tcpsocket.hpp" #include "base/stream.hpp" #include "base/base64.hpp" #include "base/json.hpp" #include "base/utility.hpp" +#include "base/networkstream.hpp" #include "base/perfdatavalue.hpp" #include "base/exception.hpp" #include "base/statsfunction.hpp" #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include @@ -71,25 +86,12 @@ void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array:: status->Set("elasticsearchwriter", new Dictionary(std::move(nodes))); } -void ElasticsearchWriter::Start(bool runtimeCreated) -{ - ObjectImpl::Start(runtimeCreated); - - if (GetEnableTls()) { - try { - m_SslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); - } catch (const std::exception& ex) { - Log(LogCritical, "ElasticsearchWriter") - << "Unable to create SSL context: " << ex.what(); - throw; - } - } -} - void ElasticsearchWriter::Resume() { ObjectImpl::Resume(); + m_EventPrefix = "icinga2.event."; + Log(LogInformation, "ElasticsearchWriter") << "'" << GetName() << "' resumed."; @@ -102,8 +104,6 @@ void ElasticsearchWriter::Resume() m_FlushTimer->Start(); m_FlushTimer->Reschedule(0); - m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()}; - /* Register for new metrics. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { @@ -128,17 +128,12 @@ void ElasticsearchWriter::Pause() m_HandleNotifications.disconnect(); m_FlushTimer->Stop(true); + m_WorkQueue.Join(); - std::promise queueDonePromise; - m_WorkQueue.Enqueue([&]() { + { + std::unique_lock lock (m_DataBufferMutex); Flush(); - queueDonePromise.set_value(); - }, PriorityLow); - - auto timeout = std::chrono::duration{GetDisconnectTimeout()}; - m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); - - m_WorkQueue.Join(); + } Log(LogInformation, "ElasticsearchWriter") << "'" << GetName() << "' paused."; @@ -282,10 +277,6 @@ void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, co AddTemplateTags(fields, checkable, cr); m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() { - if (m_Connection->IsStopped()) { - return; - } - CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'"); AddCheckResult(fields, checkable, cr); @@ -325,10 +316,6 @@ void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, co AddTemplateTags(fields, checkable, cr); m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() { - if (m_Connection->IsStopped()) { - return; - } - CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'"); AddCheckResult(fields, checkable, cr); @@ -379,10 +366,6 @@ void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Checkable::Ptr AddTemplateTags(fields, checkable, cr); m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() { - if (m_Connection->IsStopped()) { - return; - } - CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'"); Log(LogDebug, "ElasticsearchWriter") @@ -404,10 +387,15 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& { AssertOnWorkQueue(); + /* Atomically buffer the data point. */ + std::unique_lock lock(m_DataBufferMutex); + /* Format the timestamps to dynamically select the date datatype inside the index. */ fields->Set("@timestamp", FormatTimestamp(ts)); fields->Set("timestamp", FormatTimestamp(ts)); - fields->Set("type", "icinga2.event." + type); + + String eventType = m_EventPrefix + type; + fields->Set("type", eventType); /* Every payload needs a line describing the index. * We do it this way to avoid problems with a near full queue. @@ -428,21 +416,19 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& } } -/** - * Queues a Flush on the work-queue if there isn't one queued already. - */ void ElasticsearchWriter::FlushTimeout() { - if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) { - return; - } + /* Prevent new data points from being added to the array, there is a + * race condition where they could disappear. + */ + std::unique_lock lock(m_DataBufferMutex); - m_WorkQueue.Enqueue([&]() { - Defer resetFlushTimer{ - [&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); } - }; + /* Flush if there are any data available. */ + if (m_DataBuffer.size() > 0) { + Log(LogDebug, "ElasticsearchWriter") + << "Timer expired writing " << m_DataBuffer.size() << " data points"; Flush(); - }); + } } void ElasticsearchWriter::Flush() @@ -488,6 +474,22 @@ void ElasticsearchWriter::SendRequest(const String& body) url->SetPath(path); + OptionalTlsStream stream; + + try { + stream = Connect(); + } catch (const std::exception& ex) { + Log(LogWarning, "ElasticsearchWriter") + << "Flush failed, cannot connect to Elasticsearch: " << DiagnosticInformation(ex, false); + return; + } + + Defer s ([&stream]() { + if (stream.first) { + stream.first->next_layer().shutdown(); + } + }); + http::request request (http::verb::post, std::string(url->Format(true)), 10); request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion()); @@ -517,14 +519,37 @@ void ElasticsearchWriter::SendRequest(const String& body) << "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" ) << " to '" << url->Format() << "'."; - decltype(m_Connection->Send(request)) response; try { - response = m_Connection->Send(request); - } catch (const PerfdataWriterConnection::Stopped& ex) { - Log(LogDebug, "ElasticsearchWriter") << ex.what(); - return; + if (stream.first) { + http::write(*stream.first, request); + stream.first->flush(); + } else { + http::write(*stream.second, request); + stream.second->flush(); + } + } catch (const std::exception&) { + Log(LogWarning, "ElasticsearchWriter") + << "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw; + } + + http::parser parser; + beast::flat_buffer buf; + + try { + if (stream.first) { + http::read(*stream.first, buf, parser); + } else { + http::read(*stream.second, buf, parser); + } + } catch (const std::exception& ex) { + Log(LogWarning, "ElasticsearchWriter") + << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false); + throw; } + auto& response (parser.get()); + if (response.result_int() > 299) { if (response.result() == http::status::unauthorized) { /* More verbose error logging with Elasticsearch is hidden behind a proxy. */ @@ -572,6 +597,66 @@ void ElasticsearchWriter::SendRequest(const String& body) } } +OptionalTlsStream ElasticsearchWriter::Connect() +{ + Log(LogNotice, "ElasticsearchWriter") + << "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'."; + + OptionalTlsStream stream; + bool tls = GetEnableTls(); + + if (tls) { + Shared::Ptr sslContext; + + try { + sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); + } catch (const std::exception&) { + Log(LogWarning, "ElasticsearchWriter") + << "Unable to create SSL context."; + throw; + } + + stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); + + } else { + stream.second = Shared::Make(IoEngine::Get().GetIoContext()); + } + + try { + icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort()); + } catch (const std::exception&) { + Log(LogWarning, "ElasticsearchWriter") + << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw; + } + + if (tls) { + auto& tlsStream (stream.first->next_layer()); + + try { + tlsStream.handshake(tlsStream.client); + } catch (const std::exception&) { + Log(LogWarning, "ElasticsearchWriter") + << "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed."; + throw; + } + + if (!GetInsecureNoverify()) { + if (!tlsStream.GetPeerCertificate()) { + BOOST_THROW_EXCEPTION(std::runtime_error("Elasticsearch didn't present any TLS certificate.")); + } + + if (!tlsStream.IsVerifyOK()) { + BOOST_THROW_EXCEPTION(std::runtime_error( + "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError()) + )); + } + } + } + + return stream; +} + void ElasticsearchWriter::AssertOnWorkQueue() { ASSERT(m_WorkQueue.IsWorkerThread()); diff --git a/lib/perfdata/elasticsearchwriter.hpp b/lib/perfdata/elasticsearchwriter.hpp index e9998d2bffb..8282bbdb028 100644 --- a/lib/perfdata/elasticsearchwriter.hpp +++ b/lib/perfdata/elasticsearchwriter.hpp @@ -5,10 +5,11 @@ #define ELASTICSEARCHWRITER_H #include "perfdata/elasticsearchwriter-ti.hpp" -#include "icinga/checkable.hpp" +#include "icinga/service.hpp" #include "base/configobject.hpp" #include "base/workqueue.hpp" -#include "perfdata/perfdatawriterconnection.hpp" +#include "base/timer.hpp" +#include "base/tlsstream.hpp" namespace icinga { @@ -29,18 +30,16 @@ class ElasticsearchWriter final : public ObjectImpl protected: void OnConfigLoaded() override; void OnAllConfigLoaded() override; - void Start(bool runtimeCreated) override; void Resume() override; void Pause() override; private: + String m_EventPrefix; WorkQueue m_WorkQueue{10000000, 1}; boost::signals2::connection m_HandleCheckResults, m_HandleStateChanges, m_HandleNotifications; Timer::Ptr m_FlushTimer; - std::atomic_bool m_FlushTimerInQueue{false}; std::vector m_DataBuffer; - Shared::Ptr m_SslContext; - PerfdataWriterConnection::Ptr m_Connection; + std::mutex m_DataBufferMutex; void AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void AddTemplateTags(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); @@ -53,6 +52,7 @@ class ElasticsearchWriter final : public ObjectImpl void Enqueue(const Checkable::Ptr& checkable, const String& type, const Dictionary::Ptr& fields, double ts); + OptionalTlsStream Connect(); void AssertOnWorkQueue(); void ExceptionHandler(boost::exception_ptr exp); void FlushTimeout(); diff --git a/lib/perfdata/elasticsearchwriter.ti b/lib/perfdata/elasticsearchwriter.ti index c5e7bc3e1d1..4ed97fedb73 100644 --- a/lib/perfdata/elasticsearchwriter.ti +++ b/lib/perfdata/elasticsearchwriter.ti @@ -43,7 +43,7 @@ class ElasticsearchWriter : ConfigObject [config] double disconnect_timeout { default {{{ return 10; }}} }; - [config] double flush_interval { + [config] int flush_interval { default {{{ return 10; }}} }; [config] int flush_threshold { diff --git a/lib/perfdata/gelfwriter.cpp b/lib/perfdata/gelfwriter.cpp index 6f8567f7073..c2bff71dde7 100644 --- a/lib/perfdata/gelfwriter.cpp +++ b/lib/perfdata/gelfwriter.cpp @@ -6,19 +6,28 @@ #include "icinga/service.hpp" #include "icinga/notification.hpp" #include "icinga/checkcommand.hpp" +#include "icinga/macroprocessor.hpp" #include "icinga/compatutility.hpp" +#include "base/tcpsocket.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" #include "base/utility.hpp" #include "base/perfdatavalue.hpp" +#include "base/application.hpp" #include "base/stream.hpp" +#include "base/networkstream.hpp" #include "base/context.hpp" #include "base/exception.hpp" #include "base/json.hpp" #include "base/statsfunction.hpp" #include #include +#include "base/io-engine.hpp" +#include +#include +#include +#include using namespace icinga; @@ -53,7 +62,7 @@ void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perf nodes.emplace_back(gelfwriter->GetName(), new Dictionary({ { "work_queue_items", workQueueItems }, { "work_queue_item_rate", workQueueItemRate }, - { "connected", gelfwriter->m_Connection->IsConnected() }, + { "connected", gelfwriter->GetConnected() }, { "source", gelfwriter->GetSource() } })); @@ -64,22 +73,6 @@ void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perf status->Set("gelfwriter", new Dictionary(std::move(nodes))); } -void GelfWriter::Start(bool runtimeCreated) -{ - ObjectImpl::Start(runtimeCreated); - - /* Initialize connection */ - if (GetEnableTls()) { - try { - m_SslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); - } catch (const std::exception& ex) { - Log(LogWarning, "GelfWriter") - << "Unable to create SSL context."; - throw; - } - } -} - void GelfWriter::Resume() { ObjectImpl::Resume(); @@ -90,7 +83,12 @@ void GelfWriter::Resume() /* Register exception handler for WQ tasks. */ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); - m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()}; + /* Timer for reconnecting */ + m_ReconnectTimer = Timer::Create(); + m_ReconnectTimer->SetInterval(10); + m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); }); + m_ReconnectTimer->Start(); + m_ReconnectTimer->Reschedule(0); /* Register event handlers. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, @@ -115,15 +113,18 @@ void GelfWriter::Pause() m_HandleNotifications.disconnect(); m_HandleStateChanges.disconnect(); - std::promise queueDonePromise; - - m_WorkQueue.Enqueue([&]() { - queueDonePromise.set_value(); - }, PriorityLow); + m_ReconnectTimer->Stop(true); - auto timeout = std::chrono::duration{GetDisconnectTimeout()}; - m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); + m_WorkQueue.Enqueue([this]() { + try { + ReconnectInternal(); + } catch (const std::exception&) { + Log(LogInformation, "GelfWriter") + << "Unable to connect, not flushing buffers. Data may be lost."; + } + }, PriorityImmediate); + m_WorkQueue.Enqueue([this]() { DisconnectInternal(); }, PriorityLow); m_WorkQueue.Join(); Log(LogInformation, "GelfWriter") @@ -141,6 +142,126 @@ void GelfWriter::ExceptionHandler(boost::exception_ptr exp) { Log(LogCritical, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, false); Log(LogDebug, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, true); + + DisconnectInternal(); +} + +void GelfWriter::Reconnect() +{ + AssertOnWorkQueue(); + + if (IsPaused()) { + SetConnected(false); + return; + } + + ReconnectInternal(); +} + +void GelfWriter::ReconnectInternal() +{ + double startTime = Utility::GetTime(); + + CONTEXT("Reconnecting to Graylog Gelf '" << GetName() << "'"); + + SetShouldConnect(true); + + if (GetConnected()) + return; + + Log(LogNotice, "GelfWriter") + << "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'."; + + bool ssl = GetEnableTls(); + + if (ssl) { + Shared::Ptr sslContext; + + try { + sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); + } catch (const std::exception& ex) { + Log(LogWarning, "GelfWriter") + << "Unable to create SSL context."; + throw; + } + + m_Stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); + + } else { + m_Stream.second = Shared::Make(IoEngine::Get().GetIoContext()); + } + + try { + icinga::Connect(ssl ? m_Stream.first->lowest_layer() : m_Stream.second->lowest_layer(), GetHost(), GetPort()); + } catch (const std::exception& ex) { + Log(LogWarning, "GelfWriter") + << "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << ".'"; + throw; + } + + if (ssl) { + auto& tlsStream (m_Stream.first->next_layer()); + + try { + tlsStream.handshake(tlsStream.client); + } catch (const std::exception& ex) { + Log(LogWarning, "GelfWriter") + << "TLS handshake with host '" << GetHost() << " failed.'"; + throw; + } + + if (!GetInsecureNoverify()) { + if (!tlsStream.GetPeerCertificate()) { + BOOST_THROW_EXCEPTION(std::runtime_error("Graylog Gelf didn't present any TLS certificate.")); + } + + if (!tlsStream.IsVerifyOK()) { + BOOST_THROW_EXCEPTION(std::runtime_error( + "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError()) + )); + } + } + } + + SetConnected(true); + + Log(LogInformation, "GelfWriter") + << "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; +} + +void GelfWriter::ReconnectTimerHandler() +{ + m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityNormal); +} + +void GelfWriter::Disconnect() +{ + AssertOnWorkQueue(); + + DisconnectInternal(); +} + +void GelfWriter::DisconnectInternal() +{ + if (!GetConnected()) + return; + + if (m_Stream.first) { + boost::system::error_code ec; + m_Stream.first->next_layer().shutdown(ec); + + // https://stackoverflow.com/a/25703699 + // As long as the error code's category is not an SSL category, then the protocol was securely shutdown + if (ec.category() == boost::asio::error::get_ssl_category()) { + Log(LogCritical, "GelfWriter") + << "TLS shutdown with host '" << GetHost() << "' could not be done securely."; + } + } else if (m_Stream.second) { + m_Stream.second->close(); + } + + SetConnected(false); + } void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) @@ -177,10 +298,6 @@ void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const Check fields->Set("_check_command", checkCommand->GetName()); m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() { - if (m_Connection->IsStopped()) { - return; - } - CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'"); Log(LogDebug, "GelfWriter") @@ -288,10 +405,6 @@ void GelfWriter::NotificationToUserHandler(const Checkable::Ptr& checkable, Noti fields->Set("_check_command", checkable->GetCheckCommand()->GetName()); m_WorkQueue.Enqueue([this, checkable, ts, fields = std::move(fields)]() { - if (m_Connection->IsStopped()) { - return; - } - CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'"); Log(LogDebug, "GelfWriter") @@ -334,10 +447,6 @@ void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const Check fields->Set("_check_source", cr->GetCheckSource()); m_WorkQueue.Enqueue([this, checkable, fields = std::move(fields), ts = cr->GetExecutionEnd()]() { - if (m_Connection->IsStopped()) { - return; - } - CONTEXT("GELF Processing state change '" << checkable->GetName() << "'"); Log(LogDebug, "GelfWriter") @@ -364,15 +473,26 @@ void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& g msgbuf << gelfMessage; msgbuf << '\0'; - auto log = msgbuf.str(); + String log = msgbuf.str(); + + if (!GetConnected()) + return; try { Log(LogDebug, "GelfWriter") << "Checkable '" << checkable->GetName() << "' sending message '" << log << "'."; - m_Connection->Send(boost::asio::const_buffer{log.data(), log.length()}); - } catch (const PerfdataWriterConnection::Stopped& ex) { - Log(LogDebug, "GelfWriter") << ex.what(); - return; + if (m_Stream.first) { + boost::asio::write(*m_Stream.first, boost::asio::buffer(msgbuf.str())); + m_Stream.first->flush(); + } else { + boost::asio::write(*m_Stream.second, boost::asio::buffer(msgbuf.str())); + m_Stream.second->flush(); + } + } catch (const std::exception& ex) { + Log(LogCritical, "GelfWriter") + << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; + + throw ex; } } diff --git a/lib/perfdata/gelfwriter.hpp b/lib/perfdata/gelfwriter.hpp index f7d2a10c339..e24b6e6ea69 100644 --- a/lib/perfdata/gelfwriter.hpp +++ b/lib/perfdata/gelfwriter.hpp @@ -5,10 +5,12 @@ #define GELFWRITER_H #include "perfdata/gelfwriter-ti.hpp" -#include "perfdata/perfdatawriterconnection.hpp" -#include "icinga/checkable.hpp" +#include "icinga/service.hpp" #include "base/configobject.hpp" +#include "base/tcpsocket.hpp" +#include "base/timer.hpp" #include "base/workqueue.hpp" +#include namespace icinga { @@ -28,16 +30,15 @@ class GelfWriter final : public ObjectImpl protected: void OnConfigLoaded() override; - void Start(bool runtimeCreated) override; void Resume() override; void Pause() override; private: - PerfdataWriterConnection::Ptr m_Connection; + OptionalTlsStream m_Stream; WorkQueue m_WorkQueue{10000000, 1}; - Shared::Ptr m_SslContext; boost::signals2::connection m_HandleCheckResults, m_HandleNotifications, m_HandleStateChanges; + Timer::Ptr m_ReconnectTimer; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void NotificationToUserHandler(const Checkable::Ptr& checkable, NotificationType notificationType, const CheckResult::Ptr& cr, @@ -47,6 +48,13 @@ class GelfWriter final : public ObjectImpl String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts); void SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage); + void ReconnectTimerHandler(); + + void Disconnect(); + void DisconnectInternal(); + void Reconnect(); + void ReconnectInternal(); + void AssertOnWorkQueue(); void ExceptionHandler(boost::exception_ptr exp); diff --git a/lib/perfdata/gelfwriter.ti b/lib/perfdata/gelfwriter.ti index 46c194d1a4a..ef9a030a6a3 100644 --- a/lib/perfdata/gelfwriter.ti +++ b/lib/perfdata/gelfwriter.ti @@ -25,6 +25,10 @@ class GelfWriter : ConfigObject default {{{ return false; }}} }; + [no_user_modify] bool connected; + [no_user_modify] bool should_connect { + default {{{ return true; }}} + }; [config] double disconnect_timeout { default {{{ return 10; }}} }; diff --git a/lib/perfdata/graphitewriter.cpp b/lib/perfdata/graphitewriter.cpp index e00cd927589..652b7d3d1f9 100644 --- a/lib/perfdata/graphitewriter.cpp +++ b/lib/perfdata/graphitewriter.cpp @@ -7,13 +7,16 @@ #include "icinga/checkcommand.hpp" #include "icinga/macroprocessor.hpp" #include "icinga/icingaapplication.hpp" +#include "base/tcpsocket.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" #include "base/convert.hpp" #include "base/utility.hpp" #include "base/perfdatavalue.hpp" +#include "base/application.hpp" #include "base/stream.hpp" +#include "base/networkstream.hpp" #include "base/exception.hpp" #include "base/statsfunction.hpp" #include @@ -62,7 +65,7 @@ void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& nodes.emplace_back(graphitewriter->GetName(), new Dictionary({ { "work_queue_items", workQueueItems }, { "work_queue_item_rate", workQueueItemRate }, - { "connected", graphitewriter->m_Connection->IsConnected() } + { "connected", graphitewriter->GetConnected() } })); perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems)); @@ -85,7 +88,12 @@ void GraphiteWriter::Resume() /* Register exception handler for WQ tasks. */ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); - m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()}; + /* Timer for reconnecting */ + m_ReconnectTimer = Timer::Create(); + m_ReconnectTimer->SetInterval(10); + m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); }); + m_ReconnectTimer->Start(); + m_ReconnectTimer->Reschedule(0); /* Register event handlers. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, @@ -100,17 +108,20 @@ void GraphiteWriter::Resume() void GraphiteWriter::Pause() { m_HandleCheckResults.disconnect(); + m_ReconnectTimer->Stop(true); - std::promise queueDonePromise; - - m_WorkQueue.Enqueue([&]() { - queueDonePromise.set_value(); - }, PriorityLow); + try { + ReconnectInternal(); + } catch (const std::exception&) { + Log(LogInformation, "GraphiteWriter") + << "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload."; - auto timeout = std::chrono::duration{GetDisconnectTimeout()}; - m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); + ObjectImpl::Pause(); + return; + } m_WorkQueue.Join(); + DisconnectInternal(); Log(LogInformation, "GraphiteWriter") << "'" << GetName() << "' paused."; @@ -139,6 +150,105 @@ void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp) Log(LogDebug, "GraphiteWriter") << "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp)); + + if (GetConnected()) { + m_Stream->close(); + + SetConnected(false); + } +} + +/** + * Reconnect method, stops when the feature is paused in HA zones. + * + * Called inside the WQ. + */ +void GraphiteWriter::Reconnect() +{ + AssertOnWorkQueue(); + + if (IsPaused()) { + SetConnected(false); + return; + } + + ReconnectInternal(); +} + +/** + * Reconnect method, connects to a TCP Stream + */ +void GraphiteWriter::ReconnectInternal() +{ + double startTime = Utility::GetTime(); + + CONTEXT("Reconnecting to Graphite '" << GetName() << "'"); + + SetShouldConnect(true); + + if (GetConnected()) + return; + + Log(LogNotice, "GraphiteWriter") + << "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'."; + + m_Stream = Shared::Make(IoEngine::Get().GetIoContext()); + + try { + icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort()); + } catch (const std::exception& ex) { + Log(LogWarning, "GraphiteWriter") + << "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'"; + + SetConnected(false); + + throw; + } + + SetConnected(true); + + Log(LogInformation, "GraphiteWriter") + << "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; +} + +/** + * Reconnect handler called by the timer. + * + * Enqueues a reconnect task into the WQ. + */ +void GraphiteWriter::ReconnectTimerHandler() +{ + if (IsPaused()) + return; + + m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityHigh); +} + +/** + * Disconnect the stream. + * + * Called inside the WQ. + */ +void GraphiteWriter::Disconnect() +{ + AssertOnWorkQueue(); + + DisconnectInternal(); +} + +/** + * Disconnect the stream. + * + * Called outside the WQ. + */ +void GraphiteWriter::DisconnectInternal() +{ + if (!GetConnected()) + return; + + m_Stream->close(); + + SetConnected(false); } /** @@ -192,12 +302,12 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C } m_WorkQueue.Enqueue([this, checkable, cr, prefix = std::move(prefix), metadata = std::move(metadata)]() { - if (m_Connection->IsStopped()) { - return; - } - CONTEXT("Processing check result for '" << checkable->GetName() << "'"); + /* TODO: Deal with missing connection here. Needs refactoring + * into parsing the actual performance data and then putting it + * into a queue for re-inserting. */ + for (auto& [name, val] : metadata) { SendMetric(checkable, prefix + ".metadata", name, val, cr->GetExecutionEnd()); } @@ -284,11 +394,19 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p // do not send \n to debug log msgbuf << "\n"; - try { - m_Connection->Send(asio::buffer(msgbuf.str())); - } catch (const PerfdataWriterConnection::Stopped& ex) { - Log(LogDebug, "GraphiteWriter") << ex.what(); + std::unique_lock lock(m_StreamMutex); + + if (!GetConnected()) return; + + try { + asio::write(*m_Stream, asio::buffer(msgbuf.str())); + m_Stream->flush(); + } catch (const std::exception& ex) { + Log(LogCritical, "GraphiteWriter") + << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; + + throw ex; } } diff --git a/lib/perfdata/graphitewriter.hpp b/lib/perfdata/graphitewriter.hpp index 470fcc07dac..b28db817270 100644 --- a/lib/perfdata/graphitewriter.hpp +++ b/lib/perfdata/graphitewriter.hpp @@ -5,10 +5,13 @@ #define GRAPHITEWRITER_H #include "perfdata/graphitewriter-ti.hpp" -#include "icinga/checkable.hpp" +#include "icinga/service.hpp" #include "base/configobject.hpp" +#include "base/tcpsocket.hpp" +#include "base/timer.hpp" #include "base/workqueue.hpp" -#include "perfdata/perfdatawriterconnection.hpp" +#include +#include namespace icinga { @@ -35,10 +38,12 @@ class GraphiteWriter final : public ObjectImpl void Pause() override; private: - PerfdataWriterConnection::Ptr m_Connection; + Shared::Ptr m_Stream; + std::mutex m_StreamMutex; WorkQueue m_WorkQueue{10000000, 1}; boost::signals2::connection m_HandleCheckResults; + Timer::Ptr m_ReconnectTimer; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts); @@ -47,6 +52,13 @@ class GraphiteWriter final : public ObjectImpl static String EscapeMetricLabel(const String& str); static Value EscapeMacroMetric(const Value& value); + void ReconnectTimerHandler(); + + void Disconnect(); + void DisconnectInternal(); + void Reconnect(); + void ReconnectInternal(); + void AssertOnWorkQueue(); void ExceptionHandler(boost::exception_ptr exp); diff --git a/lib/perfdata/graphitewriter.ti b/lib/perfdata/graphitewriter.ti index f0d9bfb8056..d9b79c10da1 100644 --- a/lib/perfdata/graphitewriter.ti +++ b/lib/perfdata/graphitewriter.ti @@ -27,6 +27,10 @@ class GraphiteWriter : ConfigObject [config] bool enable_send_thresholds; [config] bool enable_send_metadata; + [no_user_modify] bool connected; + [no_user_modify] bool should_connect { + default {{{ return true; }}} + }; [config] double disconnect_timeout { default {{{ return 10; }}} }; diff --git a/lib/perfdata/influxdbcommonwriter.cpp b/lib/perfdata/influxdbcommonwriter.cpp index 5b6f376de9b..47d9455737c 100644 --- a/lib/perfdata/influxdbcommonwriter.cpp +++ b/lib/perfdata/influxdbcommonwriter.cpp @@ -2,7 +2,6 @@ // SPDX-License-Identifier: GPL-2.0-or-later #include "perfdata/influxdbcommonwriter.hpp" -#include "base/defer.hpp" #include "perfdata/influxdbcommonwriter-ti.cpp" #include "remote/url.hpp" #include "icinga/service.hpp" @@ -10,15 +9,36 @@ #include "icinga/icingaapplication.hpp" #include "icinga/checkcommand.hpp" #include "base/application.hpp" +#include "base/defer.hpp" +#include "base/io-engine.hpp" +#include "base/tcpsocket.hpp" +#include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" +#include "base/convert.hpp" +#include "base/utility.hpp" +#include "base/stream.hpp" #include "base/json.hpp" +#include "base/networkstream.hpp" #include "base/exception.hpp" +#include "base/statsfunction.hpp" +#include "base/tlsutility.hpp" #include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include #include +#include #include #include @@ -60,21 +80,6 @@ void InfluxdbCommonWriter::OnConfigLoaded() } } -void InfluxdbCommonWriter::Start(bool runtimeCreated) -{ - ObjectImpl::Start(runtimeCreated); - - if (GetSslEnable()) { - try { - m_SslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert()); - } catch (const std::exception& ex) { - Log(LogCritical, GetReflectionType()->GetName()) - << "Unable to create SSL context: " << ex.what(); - throw; - } - } -} - void InfluxdbCommonWriter::Resume() { ObjectImpl::Resume(); @@ -92,8 +97,6 @@ void InfluxdbCommonWriter::Resume() m_FlushTimer->Start(); m_FlushTimer->Reschedule(0); - m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetSslInsecureNoverify()}; - /* Register for new metrics. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { @@ -111,15 +114,7 @@ void InfluxdbCommonWriter::Pause() << "Processing pending tasks and flushing data buffers."; m_FlushTimer->Stop(true); - - std::promise queueDonePromise; - m_WorkQueue.Enqueue([&]() { - FlushWQ(); - queueDonePromise.set_value(); - }, PriorityLow); - - auto timeout = std::chrono::duration{GetDisconnectTimeout()}; - m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); + m_WorkQueue.Enqueue([this]() { FlushWQ(); }, PriorityLow); /* Wait for the flush to complete, implicitly waits for all WQ tasks enqueued prior to pausing. */ m_WorkQueue.Join(); @@ -141,6 +136,68 @@ void InfluxdbCommonWriter::ExceptionHandler(boost::exception_ptr exp) Log(LogDebug, GetReflectionType()->GetName()) << "Exception during InfluxDB operation: " << DiagnosticInformation(std::move(exp)); + + //TODO: Close the connection, if we keep it open. +} + +OptionalTlsStream InfluxdbCommonWriter::Connect() +{ + Log(LogNotice, GetReflectionType()->GetName()) + << "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; + + OptionalTlsStream stream; + bool ssl = GetSslEnable(); + + if (ssl) { + Shared::Ptr sslContext; + + try { + sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert()); + } catch (const std::exception& ex) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Unable to create SSL context."; + throw; + } + + stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); + + } else { + stream.second = Shared::Make(IoEngine::Get().GetIoContext()); + } + + try { + icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort()); + } catch (const std::exception& ex) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw; + } + + if (ssl) { + auto& tlsStream (stream.first->next_layer()); + + try { + tlsStream.handshake(tlsStream.client); + } catch (const std::exception& ex) { + Log(LogWarning, GetReflectionType()->GetName()) + << "TLS handshake with host '" << GetHost() << "' failed."; + throw; + } + + if (!GetSslInsecureNoverify()) { + if (!tlsStream.GetPeerCertificate()) { + BOOST_THROW_EXCEPTION(std::runtime_error("InfluxDB didn't present any TLS certificate.")); + } + + if (!tlsStream.IsVerifyOK()) { + BOOST_THROW_EXCEPTION(std::runtime_error( + "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError()) + )); + } + } + } + + return stream; } void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) @@ -204,10 +261,6 @@ void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, c } m_WorkQueue.Enqueue([this, checkable, cr, tmpl = std::move(tmpl), metadataFields = std::move(fields)]() { - if (m_Connection->IsStopped()) { - return; - } - CONTEXT("Processing check result for '" << checkable->GetName() << "'"); double ts = cr->GetExecutionEnd(); @@ -358,19 +411,19 @@ void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dic } } -/** - * Queues a Flush on the work-queue and restarts the timer. - */ void InfluxdbCommonWriter::FlushTimeout() { - if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) { - return; - } + m_WorkQueue.Enqueue([this]() { FlushTimeoutWQ(); }, PriorityHigh); +} - m_WorkQueue.Enqueue([&]() { - Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }}; - FlushWQ(); - }); +void InfluxdbCommonWriter::FlushTimeoutWQ() +{ + AssertOnWorkQueue(); + + Log(LogDebug, GetReflectionType()->GetName()) + << "Timer expired writing " << m_DataBuffer.size() << " data points"; + + FlushWQ(); } void InfluxdbCommonWriter::FlushWQ() @@ -391,16 +444,55 @@ void InfluxdbCommonWriter::FlushWQ() m_DataBuffer.clear(); m_DataBufferSize = 0; - auto request (AssembleRequest(std::move(body))); + OptionalTlsStream stream; - decltype(m_Connection->Send(request)) response; try { - response = m_Connection->Send(request); - } catch (const PerfdataWriterConnection::Stopped& ex) { - Log(LogDebug, GetReflectionType()->GetName()) << ex.what(); + stream = Connect(); + } catch (const std::exception& ex) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Flush failed, cannot connect to InfluxDB: " << DiagnosticInformation(ex, false); return; } + Defer s ([&stream]() { + if (stream.first) { + stream.first->next_layer().shutdown(); + } + }); + + auto request (AssembleRequest(std::move(body))); + + try { + if (stream.first) { + http::write(*stream.first, request); + stream.first->flush(); + } else { + http::write(*stream.second, request); + stream.second->flush(); + } + } catch (const std::exception& ex) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw; + } + + http::parser parser; + beast::flat_buffer buf; + + try { + if (stream.first) { + http::read(*stream.first, buf, parser); + } else { + http::read(*stream.second, buf, parser); + } + } catch (const std::exception& ex) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex); + throw; + } + + auto& response (parser.get()); + if (response.result() != http::status::no_content) { Log(LogCritical, GetReflectionType()->GetName()) << "Unexpected response code: " << response.result() << ", InfluxDB error message:\n" << response.body(); diff --git a/lib/perfdata/influxdbcommonwriter.hpp b/lib/perfdata/influxdbcommonwriter.hpp index cfda0250133..35caa2f257c 100644 --- a/lib/perfdata/influxdbcommonwriter.hpp +++ b/lib/perfdata/influxdbcommonwriter.hpp @@ -5,13 +5,18 @@ #define INFLUXDBCOMMONWRITER_H #include "perfdata/influxdbcommonwriter-ti.hpp" -#include "icinga/checkable.hpp" +#include "icinga/service.hpp" #include "base/configobject.hpp" #include "base/perfdatavalue.hpp" +#include "base/tcpsocket.hpp" +#include "base/timer.hpp" +#include "base/tlsstream.hpp" #include "base/workqueue.hpp" #include "remote/url.hpp" -#include "perfdata/perfdatawriterconnection.hpp" +#include +#include #include +#include namespace icinga { @@ -34,7 +39,6 @@ class InfluxdbCommonWriter : public ObjectImpl protected: void OnConfigLoaded() override; - void Start(bool runtimeCreated) override; void Resume() override; void Pause() override; @@ -46,22 +50,22 @@ class InfluxdbCommonWriter : public ObjectImpl private: boost::signals2::connection m_HandleCheckResults; Timer::Ptr m_FlushTimer; - std::atomic_bool m_FlushTimerInQueue{false}; WorkQueue m_WorkQueue{10000000, 1}; std::vector m_DataBuffer; std::atomic_size_t m_DataBufferSize{0}; - Shared::Ptr m_SslContext; - PerfdataWriterConnection::Ptr m_Connection; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts); void FlushTimeout(); + void FlushTimeoutWQ(); void FlushWQ(); static String EscapeKeyOrTagValue(const String& str); static String EscapeValue(const Value& value); + OptionalTlsStream Connect(); + void AssertOnWorkQueue(); void ExceptionHandler(boost::exception_ptr exp); diff --git a/lib/perfdata/influxdbcommonwriter.ti b/lib/perfdata/influxdbcommonwriter.ti index 7074bceaf29..0acf39740c8 100644 --- a/lib/perfdata/influxdbcommonwriter.ti +++ b/lib/perfdata/influxdbcommonwriter.ti @@ -61,7 +61,7 @@ abstract class InfluxdbCommonWriter : ConfigObject [config] bool enable_send_metadata { default {{{ return false; }}} }; - [config] double flush_interval { + [config] int flush_interval { default {{{ return 10; }}} }; [config] int flush_threshold { diff --git a/lib/perfdata/opentsdbwriter.cpp b/lib/perfdata/opentsdbwriter.cpp index 1b2f82a7d9c..00263979237 100644 --- a/lib/perfdata/opentsdbwriter.cpp +++ b/lib/perfdata/opentsdbwriter.cpp @@ -7,12 +7,17 @@ #include "icinga/checkcommand.hpp" #include "icinga/macroprocessor.hpp" #include "icinga/icingaapplication.hpp" +#include "icinga/compatutility.hpp" +#include "base/tcpsocket.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" #include "base/convert.hpp" +#include "base/utility.hpp" #include "base/perfdatavalue.hpp" +#include "base/application.hpp" #include "base/stream.hpp" +#include "base/networkstream.hpp" #include "base/exception.hpp" #include "base/statsfunction.hpp" #include @@ -31,8 +36,6 @@ void OpenTsdbWriter::OnConfigLoaded() { ObjectImpl::OnConfigLoaded(); - m_WorkQueue.SetName("OpenTsdbWriter, " + GetName()); - if (!GetEnableHa()) { Log(LogDebug, "OpenTsdbWriter") << "HA functionality disabled. Won't pause connection: " << GetName(); @@ -48,26 +51,14 @@ void OpenTsdbWriter::OnConfigLoaded() * * @param status Key value pairs for feature stats */ -void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) +void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&) { DictionaryData nodes; for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType()) { - size_t workQueueItems = opentsdbwriter->m_WorkQueue.GetLength(); - double workQueueItemRate = opentsdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0; - - nodes.emplace_back( - opentsdbwriter->GetName(), - new Dictionary({ - { "connected", opentsdbwriter->m_Connection->IsConnected() }, - {"work_queue_items", workQueueItems}, - {"work_queue_item_rate", workQueueItemRate} - } - ) - ); - - perfdata->Add(new PerfdataValue("opentsdbwriter_" + opentsdbwriter->GetName() + "_work_queue_items", workQueueItems)); - perfdata->Add(new PerfdataValue("opentsdbwriter_" + opentsdbwriter->GetName() + "_work_queue_item_rate", workQueueItemRate)); + nodes.emplace_back(opentsdbwriter->GetName(), new Dictionary({ + { "connected", opentsdbwriter->GetConnected() } + })); } status->Set("opentsdbwriter", new Dictionary(std::move(nodes))); @@ -83,14 +74,13 @@ void OpenTsdbWriter::Resume() Log(LogInformation, "OpentsdbWriter") << "'" << GetName() << "' resumed."; - m_WorkQueue.SetExceptionCallback([](const boost::exception_ptr& exp) { - Log(LogDebug, "OpenTsdbWriter") - << "Exception during OpenTsdb operation: " << DiagnosticInformation(exp); - }); - ReadConfigTemplate(); - m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()}; + m_ReconnectTimer = Timer::Create(); + m_ReconnectTimer->SetInterval(10); + m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); }); + m_ReconnectTimer->Start(); + m_ReconnectTimer->Reschedule(0); m_HandleCheckResults = Service::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { CheckResultHandler(checkable, cr); @@ -103,22 +93,58 @@ void OpenTsdbWriter::Resume() void OpenTsdbWriter::Pause() { m_HandleCheckResults.disconnect(); + m_ReconnectTimer->Stop(true); - std::promise queueDonePromise; + Log(LogInformation, "OpentsdbWriter") + << "'" << GetName() << "' paused."; - m_WorkQueue.Enqueue([&]() { - queueDonePromise.set_value(); - }, PriorityLow); + m_Stream->close(); - auto timeout = std::chrono::duration{GetDisconnectTimeout()}; - m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); + SetConnected(false); - m_WorkQueue.Join(); + ObjectImpl::Pause(); +} - Log(LogInformation, "OpentsdbWriter") - << "'" << GetName() << "' paused."; +/** + * Reconnect handler called by the timer. + * Handles TLS + */ +void OpenTsdbWriter::ReconnectTimerHandler() +{ + if (IsPaused()) + return; - ObjectImpl::Pause(); + SetShouldConnect(true); + + if (GetConnected()) + return; + + double startTime = Utility::GetTime(); + + Log(LogNotice, "OpenTsdbWriter") + << "Reconnecting to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'."; + + /* + * We're using telnet as input method. Future PRs may change this into using the HTTP API. + * http://opentsdb.net/docs/build/html/user_guide/writing/index.html#telnet + */ + m_Stream = Shared::Make(IoEngine::Get().GetIoContext()); + + try { + icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort()); + } catch (const std::exception& ex) { + Log(LogWarning, "OpenTsdbWriter") + << "Can't connect to OpenTSDB on host '" << GetHost() << "' port '" << GetPort() << "'."; + + SetConnected(false); + + return; + } + + SetConnected(true); + + Log(LogInformation, "OpenTsdbWriter") + << "Finished reconnecting to OpenTSDB in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; } /** @@ -225,7 +251,7 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C String escaped_hostName = EscapeTag(host->GetName()); tags["host"] = escaped_hostName; - std::vector> metadata; + double ts = cr->GetExecutionEnd(); if (service) { @@ -236,55 +262,40 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C String escaped_serviceName = EscapeMetric(serviceName); metric = "icinga.service." + escaped_serviceName; } + + SendMetric(checkable, metric + ".state", tags, service->GetState(), ts); - metadata.emplace_back("state", service->GetState()); } else { if (!config_tmpl_metric.IsEmpty()) { metric = config_tmpl_metric; } else { metric = "icinga.host"; } - metadata.emplace_back("state", host->GetState()); + SendMetric(checkable, metric + ".state", tags, host->GetState(), ts); } - metadata.emplace_back("state_type", checkable->GetStateType()); - metadata.emplace_back("reachable", checkable->IsReachable()); - metadata.emplace_back("downtime_depth", checkable->GetDowntimeDepth()); - metadata.emplace_back("acknowledgement", checkable->GetAcknowledgement()); + SendMetric(checkable, metric + ".state_type", tags, checkable->GetStateType(), ts); + SendMetric(checkable, metric + ".reachable", tags, checkable->IsReachable(), ts); + SendMetric(checkable, metric + ".downtime_depth", tags, checkable->GetDowntimeDepth(), ts); + SendMetric(checkable, metric + ".acknowledgement", tags, checkable->GetAcknowledgement(), ts); - m_WorkQueue.Enqueue( - [this, checkable, service, cr, metric = std::move(metric), tags = std::move(tags), metadata = std::move(metadata)]() mutable { - if (m_Connection->IsStopped()) { - return; - } - - double ts = cr->GetExecutionEnd(); - - for (auto& [name, val] : metadata) { - AddMetric(checkable, metric + "." + name, tags, val, ts); - } + SendPerfdata(checkable, metric, tags, cr, ts); - AddPerfdata(checkable, metric, tags, cr, ts); - - metric = "icinga.check"; - - if (service) { - tags["type"] = "service"; - String serviceName = service->GetShortName(); - String escaped_serviceName = EscapeTag(serviceName); - tags["service"] = escaped_serviceName; - } else { - tags["type"] = "host"; - } + metric = "icinga.check"; - AddMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts); - AddMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts); - AddMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts); - AddMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts); + if (service) { + tags["type"] = "service"; + String serviceName = service->GetShortName(); + String escaped_serviceName = EscapeTag(serviceName); + tags["service"] = escaped_serviceName; + } else { + tags["type"] = "host"; + } - SendMsgBuffer(); - } - ); + SendMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts); + SendMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts); + SendMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts); + SendMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts); } /** @@ -296,11 +307,9 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C * @param cr Check result containing performance data * @param ts Timestamp when the check result was received */ -void OpenTsdbWriter::AddPerfdata(const Checkable::Ptr& checkable, const String& metric, +void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, const CheckResult::Ptr& cr, double ts) { - ASSERT(m_WorkQueue.IsWorkerThread()); - Array::Ptr perfdata = cr->GetPerformanceData(); if (!perfdata) @@ -341,21 +350,21 @@ void OpenTsdbWriter::AddPerfdata(const Checkable::Ptr& checkable, const String& tags_new["label"] = escaped_key; } - AddMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts); + SendMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts); if (!pdv->GetCrit().IsEmpty()) - AddMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts); + SendMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts); if (!pdv->GetWarn().IsEmpty()) - AddMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts); + SendMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts); if (!pdv->GetMin().IsEmpty()) - AddMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts); + SendMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts); if (!pdv->GetMax().IsEmpty()) - AddMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts); + SendMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts); } } /** - * Add given metric to the data buffer to be later sent to OpenTSDB + * Send given metric to OpenTSDB * * @param checkable Host/service object * @param metric Full metric name @@ -363,11 +372,9 @@ void OpenTsdbWriter::AddPerfdata(const Checkable::Ptr& checkable, const String& * @param value Floating point metric value * @param ts Timestamp where the metric was received from the check result */ -void OpenTsdbWriter::AddMetric(const Checkable::Ptr& checkable, const String& metric, +void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, double value, double ts) { - ASSERT(m_WorkQueue.IsWorkerThread()); - String tags_string = ""; for (auto& tag : tags) { @@ -387,21 +394,22 @@ void OpenTsdbWriter::AddMetric(const Checkable::Ptr& checkable, const String& me /* do not send \n to debug log */ msgbuf << "\n"; - m_MsgBuf.append(msgbuf.str()); -} + String put = msgbuf.str(); -void OpenTsdbWriter::SendMsgBuffer() -{ - ASSERT(m_WorkQueue.IsWorkerThread()); + ObjectLock olock(this); - Log(LogDebug, "OpenTsdbWriter") - << "Flushing data buffer to OpenTsdb."; + if (!GetConnected()) + return; try { - m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf, std::string{}))); - } catch (const PerfdataWriterConnection::Stopped& ex) { - Log(LogDebug, "OpenTsdbWriter") << ex.what(); - return; + Log(LogDebug, "OpenTsdbWriter") + << "Checkable '" << checkable->GetName() << "' sending message '" << put << "'."; + + boost::asio::write(*m_Stream, boost::asio::buffer(msgbuf.str())); + m_Stream->flush(); + } catch (const std::exception& ex) { + Log(LogCritical, "OpenTsdbWriter") + << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; } } diff --git a/lib/perfdata/opentsdbwriter.hpp b/lib/perfdata/opentsdbwriter.hpp index 5db2985400a..cd3f2efc493 100644 --- a/lib/perfdata/opentsdbwriter.hpp +++ b/lib/perfdata/opentsdbwriter.hpp @@ -5,9 +5,11 @@ #define OPENTSDBWRITER_H #include "perfdata/opentsdbwriter-ti.hpp" -#include "icinga/checkable.hpp" +#include "icinga/service.hpp" #include "base/configobject.hpp" -#include "perfdata/perfdatawriterconnection.hpp" +#include "base/tcpsocket.hpp" +#include "base/timer.hpp" +#include namespace icinga { @@ -34,24 +36,24 @@ class OpenTsdbWriter final : public ObjectImpl void Pause() override; private: - WorkQueue m_WorkQueue{10000000, 1}; - std::string m_MsgBuf; - PerfdataWriterConnection::Ptr m_Connection; + Shared::Ptr m_Stream; boost::signals2::connection m_HandleCheckResults; + Timer::Ptr m_ReconnectTimer; Dictionary::Ptr m_ServiceConfigTemplate; Dictionary::Ptr m_HostConfigTemplate; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); - void AddMetric(const Checkable::Ptr& checkable, const String& metric, + void SendMetric(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, double value, double ts); - void SendMsgBuffer(); - void AddPerfdata(const Checkable::Ptr& checkable, const String& metric, + void SendPerfdata(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, const CheckResult::Ptr& cr, double ts); static String EscapeTag(const String& str); static String EscapeMetric(const String& str); + void ReconnectTimerHandler(); + void ReadConfigTemplate(); }; diff --git a/lib/perfdata/opentsdbwriter.ti b/lib/perfdata/opentsdbwriter.ti index dcad571682b..0c2daf51e56 100644 --- a/lib/perfdata/opentsdbwriter.ti +++ b/lib/perfdata/opentsdbwriter.ti @@ -31,6 +31,11 @@ class OpenTsdbWriter : ConfigObject [config] bool enable_generic_metrics { default {{{ return false; }}} }; + + [no_user_modify] bool connected; + [no_user_modify] bool should_connect { + default {{{ return true; }}} + }; [config] double disconnect_timeout { default {{{ return 10; }}} }; diff --git a/lib/perfdata/perfdatawriterconnection.cpp b/lib/perfdata/perfdatawriterconnection.cpp deleted file mode 100644 index 46000c28f47..00000000000 --- a/lib/perfdata/perfdatawriterconnection.cpp +++ /dev/null @@ -1,209 +0,0 @@ -// SPDX-FileCopyrightText: 2026 Icinga GmbH -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "perfdata/perfdatawriterconnection.hpp" -#include "base/tcpsocket.hpp" -#include -#include -#include -#include - -using namespace icinga; -using HttpResponse = PerfdataWriterConnection::HttpResponse; - -PerfdataWriterConnection::PerfdataWriterConnection( - const ConfigObject::Ptr& parent, - String host, - String port, - Shared::Ptr sslContext, - bool verifyPeerCertificate -) - : PerfdataWriterConnection( - parent->GetReflectionType()->GetName(), - parent->GetName(), - std::move(host), - std::move(port), - std::move(sslContext), - verifyPeerCertificate - ) {}; - -PerfdataWriterConnection::PerfdataWriterConnection( - String logFacility, - String parentName, - String host, - String port, - Shared::Ptr sslContext, - bool verifyPeerCertificate -) - : m_VerifyPeerCertificate(verifyPeerCertificate), - m_SslContext(std::move(sslContext)), - m_LogFacility(std::move(logFacility)), - m_ParentName(std::move(parentName)), - m_Host(std::move(host)), - m_Port(std::move(port)), - m_ReconnectTimer(IoEngine::Get().GetIoContext()), - m_Strand(IoEngine::Get().GetIoContext()), - m_Stream(MakeStream()) -{ -} - -/** - * Get the current state of the connection. - */ -bool PerfdataWriterConnection::IsConnected() const -{ - return m_Connected; -} - -bool PerfdataWriterConnection::IsStopped() const -{ - return m_Stopped; -} - -void PerfdataWriterConnection::Disconnect() -{ - if (m_Stopped.exchange(true, std::memory_order_relaxed)) { - return; - } - - std::promise promise; - - IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) { - try { - /* Cancel any outstanding operations of the other coroutine. - * Since we're on the same strand we're hopefully guaranteed that all cancellations - * result in exceptions thrown by the yield_context, even if its already queued for - * completion. - */ - std::visit( - [](const auto& stream) { - if (stream->lowest_layer().is_open()) { - stream->lowest_layer().cancel(); - } - }, - m_Stream - ); - m_ReconnectTimer.cancel(); - - Disconnect(std::move(yc)); - promise.set_value(); - } catch (const std::exception& ex) { - promise.set_exception(std::current_exception()); - } - }); - - promise.get_future().get(); -} - -AsioTlsOrTcpStream PerfdataWriterConnection::MakeStream() const -{ - AsioTlsOrTcpStream ret; - if (m_SslContext) { - ret = Shared::Make(IoEngine::Get().GetIoContext(), *m_SslContext, m_Host); - } else { - ret = Shared::Make(IoEngine::Get().GetIoContext()); - } - - return ret; -} - -/** - * Wait for the next attempt after an error, using a backoff algorithm. - * - * The waits between retries are doubled for each failure, up to a maximum of 32s, until it is - * reset by a successful attempt. - */ -void PerfdataWriterConnection::BackoffWait(const boost::asio::yield_context& yc) -{ - m_ReconnectTimer.expires_after(m_RetryTimeout); - if (m_RetryTimeout <= FinalRetryWait / 2) { - m_RetryTimeout *= 2; - } - m_ReconnectTimer.async_wait(yc); -} - -void PerfdataWriterConnection::EnsureConnected(const boost::asio::yield_context& yc) -{ - if (m_Connected) { - return; - } - - std::visit( - [&](auto& stream) { - ::Connect(stream->lowest_layer(), m_Host, m_Port, yc); - - if constexpr (std::is_same_v, Shared::Ptr>) { - using type = boost::asio::ssl::stream_base::handshake_type; - - stream->next_layer().async_handshake(type::client, yc); - - if (m_VerifyPeerCertificate) { - if (!stream->next_layer().IsVerifyOK()) { - BOOST_THROW_EXCEPTION( - std::runtime_error{ - "TLS certificate validation failed: " + stream->next_layer().GetVerifyError() - } - ); - } - } - } - }, - m_Stream - ); - - m_Connected = true; -} - -void PerfdataWriterConnection::Disconnect(boost::asio::yield_context yc) -{ - if (!m_Connected.exchange(false, std::memory_order_relaxed)) { - return; - } - - std::visit( - [&](auto& stream) { - if constexpr (std::is_same_v, Shared::Ptr>) { - stream->GracefulDisconnect(m_Strand, yc); - } else { - stream->lowest_layer().shutdown(boost::asio::socket_base::shutdown_both); - stream->lowest_layer().close(); - } - }, - m_Stream - ); - - m_Stream = MakeStream(); -} - -void PerfdataWriterConnection::WriteMessage(boost::asio::const_buffer buf, const boost::asio::yield_context& yc) -{ - std::visit( - [&](auto& stream) { - boost::asio::async_write(*stream, buf, yc); - stream->async_flush(yc); - }, - m_Stream - ); -} - -HttpResponse PerfdataWriterConnection::WriteMessage(const HttpRequest& request, const boost::asio::yield_context& yc) -{ - boost::beast::http::response response; - std::visit( - [&](auto& stream) { - boost::beast::http::request_serializer sr{request}; - boost::beast::http::async_write(*stream, sr, yc); - stream->async_flush(yc); - - boost::beast::flat_buffer buf; - boost::beast::http::async_read(*stream, buf, response, yc); - }, - m_Stream - ); - - if (!response.keep_alive()) { - Disconnect(yc); - } - - return response; -} diff --git a/lib/perfdata/perfdatawriterconnection.hpp b/lib/perfdata/perfdatawriterconnection.hpp deleted file mode 100644 index 729878a2960..00000000000 --- a/lib/perfdata/perfdatawriterconnection.hpp +++ /dev/null @@ -1,157 +0,0 @@ -// SPDX-FileCopyrightText: 2026 Icinga GmbH -// SPDX-License-Identifier: GPL-3.0-or-later - -#pragma once - -#include "base/io-engine.hpp" -#include "base/tlsstream.hpp" -#include -#include -#include -#include -#include - -namespace icinga { - -/** - * Class handling the connection to the various Perfdata backends. - */ -class PerfdataWriterConnection final : public Object -{ - static constexpr auto InitialRetryWait = 50ms; - static constexpr auto FinalRetryWait = 32s; - -public: - DECLARE_PTR_TYPEDEFS(PerfdataWriterConnection); - - struct Stopped : std::exception - { - [[nodiscard]] const char* what() const noexcept final { return "Connection stopped."; } - }; - - using HttpRequest = boost::beast::http::request; - using HttpResponse = boost::beast::http::response; - - PerfdataWriterConnection( - const ConfigObject::Ptr& parent, - String host, - String port, - Shared::Ptr sslContext = nullptr, - bool verifyPeerCertificate = true - ); - - PerfdataWriterConnection( - String logFacility, - String parentName, - String host, - String port, - Shared::Ptr sslContext = nullptr, - bool verifyPeerCertificate = true - ); - - /** - * Send the given data buffer to the server. - * - * To support each Buffer type this function needs an overload of the WriteMessage method. - * If the selected WriteMessage functions returns something, Send() will return that result. - * - * @param buf The buffer to send - * @return the return value returned by the WriteMessage overload for Buffer, otherwise void - */ - template - auto Send(Buffer&& buf) - { - if (m_Stopped) { - BOOST_THROW_EXCEPTION(Stopped{}); - } - - using RetType = decltype(WriteMessage(std::declval(), std::declval())); - std::promise promise; - - IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) { - while (true) { - try { - EnsureConnected(yc); - - if constexpr (std::is_void_v) { - WriteMessage(std::forward(buf), yc); - promise.set_value(); - } else { - promise.set_value(WriteMessage(std::forward(buf), yc)); - } - - m_RetryTimeout = InitialRetryWait; - return; - } catch (const std::exception& ex) { - if (m_Stopped) { - promise.set_exception(std::make_exception_ptr(Stopped{})); - return; - } - - Log(LogCritical, m_LogFacility) - << "Error while " << (m_Connected ? "sending" : "connecting") << " to '" << m_Host << ":" - << m_Port << "' for '" << m_ParentName << "': " << ex.what(); - - m_Stream = MakeStream(); - m_Connected = false; - - try { - BackoffWait(yc); - } catch (const std::exception&) { - promise.set_exception(std::make_exception_ptr(Stopped{})); - return; - } - } - } - }); - - return promise.get_future().get(); - } - - void Disconnect(); - - /** - * Cancels ongoing operations either after a timeout or a future became ready. - * - * This will disconnect and set a flag so that no further Send() requests are accepted. - * - * @param future The future to wait for - * @param timeout The timeout after which ongoing operations are canceled - */ - template - void CancelAfterTimeout(const std::future& future, const std::chrono::duration& timeout) - { - future.wait_for(timeout); - Disconnect(); - } - - bool IsConnected() const; - bool IsStopped() const; - -private: - AsioTlsOrTcpStream MakeStream() const; - void BackoffWait(const boost::asio::yield_context& yc); - void EnsureConnected(const boost::asio::yield_context& yc); - void Disconnect(boost::asio::yield_context yc); - - void WriteMessage(boost::asio::const_buffer, const boost::asio::yield_context& yc); - HttpResponse WriteMessage(const HttpRequest& request, const boost::asio::yield_context& yc); - - std::atomic_bool m_Stopped{false}; - std::atomic_bool m_Connected{false}; - - bool m_VerifyPeerCertificate; - Shared::Ptr m_SslContext; - - String m_LogFacility; - String m_ParentName; - String m_Host; - String m_Port; - - std::chrono::milliseconds m_RetryTimeout{InitialRetryWait}; - boost::asio::steady_timer m_ReconnectTimer; - boost::asio::io_context::strand m_Strand; - AsioTlsOrTcpStream m_Stream; -}; - -} // namespace icinga diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 94296c38bd6..a13d748ddd8 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -148,18 +148,6 @@ if(ICINGA2_WITH_OPENTELEMETRY) list(APPEND base_test_SOURCES $) endif() -if(ICINGA2_WITH_PERFDATA) - list(APPEND base_test_SOURCES - perfdata-elasticsearchwriter.cpp - perfdata-gelfwriter.cpp - perfdata-graphitewriter.cpp - perfdata-influxdbwriter.cpp - perfdata-opentsdbwriter.cpp - perfdata-perfdatawriterconnection.cpp - $ - ) -endif() - if(ICINGA2_UNITY_BUILD) mkunity_target(base test base_test_SOURCES) endif() diff --git a/test/base-testloggerfixture.hpp b/test/base-testloggerfixture.hpp index d01384fa813..1d1426efcd4 100644 --- a/test/base-testloggerfixture.hpp +++ b/test/base-testloggerfixture.hpp @@ -11,12 +11,6 @@ #include #include -#define CHECK_LOG_MESSAGE(pattern, timeout) BOOST_CHECK(ExpectLogPattern(pattern, timeout)) -#define REQUIRE_LOG_MESSAGE(pattern, timeout) BOOST_REQUIRE(ExpectLogPattern(pattern, timeout)) - -#define CHECK_NO_LOG_MESSAGE(pattern, timeout) BOOST_CHECK(!ExpectLogPattern(pattern, timeout)) -#define REQUIRE_NO_LOG_MESSAGE(pattern, timeout) BOOST_REQUIRE(!ExpectLogPattern(pattern, timeout)) - namespace icinga { class TestLogger : public Logger diff --git a/test/notification-notificationcomponent.cpp b/test/notification-notificationcomponent.cpp index a67882ab793..c2ef994a026 100644 --- a/test/notification-notificationcomponent.cpp +++ b/test/notification-notificationcomponent.cpp @@ -5,7 +5,6 @@ #include "base/defer.hpp" #include "remote/apilistener.hpp" #include "test/base-testloggerfixture.hpp" -#include "test/utils.hpp" #include "config/configcompiler.hpp" #include "notification/notificationcomponent.hpp" @@ -195,7 +194,22 @@ object NotificationComponent "nc" {} void ReceiveCheckResults(std::size_t num, ServiceState state) { - ::ReceiveCheckResults(m_Host, num, state); + StoppableWaitGroup::Ptr wg = new StoppableWaitGroup(); + + for (auto i = 0UL; i < num; ++i) { + CheckResult::Ptr cr = new CheckResult(); + + cr->SetState(state); + + double now = Utility::GetTime(); + cr->SetActive(false); + cr->SetScheduleStart(now); + cr->SetScheduleEnd(now); + cr->SetExecutionStart(now); + cr->SetExecutionEnd(now); + + BOOST_REQUIRE(m_Host->ProcessCheckResult(cr, wg) == Checkable::ProcessingResult::Ok); + } } double GetLastNotificationTimestamp() { return m_Notification->GetLastNotification(); } diff --git a/test/perfdata-elasticsearchwriter.cpp b/test/perfdata-elasticsearchwriter.cpp deleted file mode 100644 index ac6abac8d76..00000000000 --- a/test/perfdata-elasticsearchwriter.cpp +++ /dev/null @@ -1,57 +0,0 @@ -// SPDX-FileCopyrightText: 2026 Icinga GmbH -// SPDX-License-Identifier: GPL-3.0-or-later - -#include -#include "perfdata/elasticsearchwriter.hpp" -#include "test/base-testloggerfixture.hpp" -#include "test/perfdata-perfdatawriterfixture.hpp" -#include "test/utils.hpp" - -using namespace icinga; - -BOOST_FIXTURE_TEST_SUITE(perfdata_elasticsearchwriter, PerfdataWriterFixture, - *boost::unit_test::label("perfdata") - *boost::unit_test::label("network") -) - -BOOST_AUTO_TEST_CASE(connect) -{ - ResumeWriter(); - - ReceiveCheckResults(1, ServiceState::ServiceCritical); - - Accept(); - auto resp = GetSplitDecodedRequestBody(); - SendResponse(); - - // ElasticsearchWriter wants to send the same message twice, once for the check result - // and once for the "state change". - resp = GetSplitDecodedRequestBody(); - SendResponse(); - - // Just some basic sanity tests. It's not important to check if everything is entirely - // correct here. - BOOST_REQUIRE_GT(resp->GetLength(), 1); - Dictionary::Ptr cr = resp->Get(1); - BOOST_CHECK(cr->Contains("@timestamp")); - BOOST_CHECK_EQUAL(cr->Get("check_command"), "dummy"); - BOOST_CHECK_EQUAL(cr->Get("host"), "h1"); - - PauseWriter(); -} - -BOOST_AUTO_TEST_CASE(pause_with_pending_work) -{ - ResumeWriter(); - - // Process check-results until the writer is stuck. - BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck."); - - // Now try to pause. - PauseWriter(); - - REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s); - REQUIRE_LOG_MESSAGE("'ElasticsearchWriter' paused\\.", 10s); -} - -BOOST_AUTO_TEST_SUITE_END() diff --git a/test/perfdata-gelfwriter.cpp b/test/perfdata-gelfwriter.cpp deleted file mode 100644 index 8da07bc4a69..00000000000 --- a/test/perfdata-gelfwriter.cpp +++ /dev/null @@ -1,48 +0,0 @@ -// SPDX-FileCopyrightText: 2026 Icinga GmbH -// SPDX-License-Identifier: GPL-3.0-or-later - -#include -#include "perfdata/gelfwriter.hpp" -#include "test/base-testloggerfixture.hpp" -#include "test/perfdata-perfdatawriterfixture.hpp" -#include "test/utils.hpp" - -using namespace icinga; - -BOOST_FIXTURE_TEST_SUITE(perfdata_gelfwriter, PerfdataWriterFixture, - *boost::unit_test::label("perfdata") - *boost::unit_test::label("network") -) - -BOOST_AUTO_TEST_CASE(connect) -{ - ResumeWriter(); - - ReceiveCheckResults(1, ServiceState::ServiceCritical); - - Accept(); - Dictionary::Ptr resp = JsonDecode(GetDataUntil('\0')); - - // Just some basic sanity tests. It's not important to check if everything is entirely - // correct here. - BOOST_CHECK_CLOSE(resp->Get("timestamp").Get(), Utility::GetTime(), 0.5); - BOOST_CHECK_EQUAL(resp->Get("_check_command"), "dummy"); - BOOST_CHECK_EQUAL(resp->Get("_hostname"), "h1"); - PauseWriter(); -} - -BOOST_AUTO_TEST_CASE(pause_with_pending_work) -{ - ResumeWriter(); - - // Process check-results until the writer is stuck. - BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck."); - - // Now stop reading and try to pause OpenTsdbWriter. - PauseWriter(); - - REQUIRE_LOG_MESSAGE("Connection stopped\\.", 1s); - REQUIRE_LOG_MESSAGE("'GelfWriter' paused\\.", 1s); -} - -BOOST_AUTO_TEST_SUITE_END() diff --git a/test/perfdata-graphitewriter.cpp b/test/perfdata-graphitewriter.cpp deleted file mode 100644 index 9b5789fe219..00000000000 --- a/test/perfdata-graphitewriter.cpp +++ /dev/null @@ -1,47 +0,0 @@ -// SPDX-FileCopyrightText: 2026 Icinga GmbH -// SPDX-License-Identifier: GPL-3.0-or-later - -#include -#include "base/perfdatavalue.hpp" -#include "perfdata/graphitewriter.hpp" -#include "test/base-testloggerfixture.hpp" -#include "test/perfdata-perfdatawriterfixture.hpp" -#include "test/utils.hpp" - -using namespace icinga; - -BOOST_FIXTURE_TEST_SUITE(perfdata_graphitewriter, PerfdataWriterFixture, - *boost::unit_test::label("perfdata") - *boost::unit_test::label("network") -) - -BOOST_AUTO_TEST_CASE(connect) -{ - ResumeWriter(); - - ReceiveCheckResults(1, ServiceState::ServiceCritical); - - Accept(); - auto msg = GetDataUntil('\n'); - - // Just some basic sanity tests. It's not important to check if everything is entirely correct here. - std::string_view cmpStr{"icinga2.h1.host.dummy.perfdata.dummy.value 42"}; - BOOST_REQUIRE_EQUAL(msg.substr(0, cmpStr.length()), cmpStr); - PauseWriter(); -} - -BOOST_AUTO_TEST_CASE(pause_with_pending_work) -{ - ResumeWriter(); - - // Process check-results until the writer is stuck. - BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck."); - - // Now stop reading and try to pause OpenTsdbWriter. - PauseWriter(); - - REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s); - REQUIRE_LOG_MESSAGE("'GraphiteWriter' paused\\.", 10s); -} - -BOOST_AUTO_TEST_SUITE_END() diff --git a/test/perfdata-influxdbwriter.cpp b/test/perfdata-influxdbwriter.cpp deleted file mode 100644 index 43837b50fcf..00000000000 --- a/test/perfdata-influxdbwriter.cpp +++ /dev/null @@ -1,50 +0,0 @@ -// SPDX-FileCopyrightText: 2026 Icinga GmbH -// SPDX-License-Identifier: GPL-3.0-or-later - -#include -#include "perfdata/influxdb2writer.hpp" -#include "test/base-testloggerfixture.hpp" -#include "test/perfdata-perfdatawriterfixture.hpp" - -using namespace icinga; - -BOOST_FIXTURE_TEST_SUITE(perfdata_influxdbwriter, PerfdataWriterFixture, - *boost::unit_test::label("perfdata") - *boost::unit_test::label("network") -) - -BOOST_AUTO_TEST_CASE(connect) -{ - ResumeWriter(); - - ReceiveCheckResults(1, ServiceState::ServiceCritical); - - Accept(); - auto req = GetSplitRequestBody(','); - SendResponse(boost::beast::http::status::no_content); - - // Just some basic sanity tests. It's not important to check if everything is entirely - // correct here. - BOOST_REQUIRE_EQUAL(req.size(), 3); - BOOST_CHECK_EQUAL(req[0], "dummy"); - BOOST_CHECK_EQUAL(req[1], "hostname=h1"); - std::string_view perfData = "metric=dummy value=42"; - BOOST_CHECK_EQUAL(req[2].substr(0, perfData.length()), perfData); - PauseWriter(); -} - -BOOST_AUTO_TEST_CASE(pause_with_pending_work) -{ - ResumeWriter(); - - // Process check-results until the writer is stuck. - BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck."); - - // Now try to pause. - PauseWriter(); - - REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s); - REQUIRE_LOG_MESSAGE("'Influxdb2Writer' paused\\.", 1s); -} - -BOOST_AUTO_TEST_SUITE_END() diff --git a/test/perfdata-opentsdbwriter.cpp b/test/perfdata-opentsdbwriter.cpp deleted file mode 100644 index c3ec47d6deb..00000000000 --- a/test/perfdata-opentsdbwriter.cpp +++ /dev/null @@ -1,53 +0,0 @@ -// SPDX-FileCopyrightText: 2026 Icinga GmbH -// SPDX-License-Identifier: GPL-3.0-or-later - -#include -#include "base/perfdatavalue.hpp" -#include "perfdata/opentsdbwriter.hpp" -#include "test/base-testloggerfixture.hpp" -#include "test/perfdata-perfdatawriterfixture.hpp" -#include "test/utils.hpp" - -using namespace icinga; - -BOOST_FIXTURE_TEST_SUITE(perfdata_opentsdbwriter, PerfdataWriterFixture, - *boost::unit_test::label("perfdata") - *boost::unit_test::label("network") -) - -BOOST_AUTO_TEST_CASE(connect) -{ - ResumeWriter(); - - ReceiveCheckResults(1, ServiceState::ServiceCritical); - - Accept(); - auto msg = GetDataUntil('\n'); - std::vector splitMsg; - boost::split(splitMsg, msg, boost::is_any_of(" ")); - - // Just some basic sanity tests. It's not important to check if everything is entirely correct here. - BOOST_REQUIRE_EQUAL(splitMsg.size(), 5); - BOOST_REQUIRE_EQUAL(splitMsg[0], "put"); - BOOST_REQUIRE_EQUAL(splitMsg[1], "icinga.host.state"); - BOOST_REQUIRE_CLOSE(boost::lexical_cast(splitMsg[2]), Utility::GetTime(), 1); - BOOST_REQUIRE_EQUAL(splitMsg[3], "1"); - BOOST_REQUIRE_EQUAL(splitMsg[4], "host=h1"); - PauseWriter(); -} - -BOOST_AUTO_TEST_CASE(pause_with_pending_work) -{ - ResumeWriter(); - - // Process check-results until the writer is stuck. - BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck."); - - // Now stop reading and try to pause OpenTsdbWriter. - PauseWriter(); - - REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s); - REQUIRE_LOG_MESSAGE("'OpenTsdbWriter' paused\\.", 10s); -} - -BOOST_AUTO_TEST_SUITE_END() diff --git a/test/perfdata-perfdatatargetfixture.hpp b/test/perfdata-perfdatatargetfixture.hpp deleted file mode 100644 index bac1c504de9..00000000000 --- a/test/perfdata-perfdatatargetfixture.hpp +++ /dev/null @@ -1,206 +0,0 @@ -// SPDX-FileCopyrightText: 2026 Icinga GmbH -// SPDX-License-Identifier: GPL-3.0-or-later - -#pragma once - -#include -#include "base/io-engine.hpp" -#include "base/json.hpp" -#include "base/tlsstream.hpp" -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace icinga { - -/** - * A fixture that provides methods to simulate a perfdata target - */ -class PerfdataWriterTargetFixture -{ -public: - PerfdataWriterTargetFixture() - : icinga::PerfdataWriterTargetFixture(Shared::Make(IoEngine::Get().GetIoContext())) - { - } - - explicit PerfdataWriterTargetFixture(const Shared::Ptr& sslCtx) - : icinga::PerfdataWriterTargetFixture(Shared::Make(IoEngine::Get().GetIoContext(), *sslCtx)) - { - m_SslContext = sslCtx; - } - - explicit PerfdataWriterTargetFixture(AsioTlsOrTcpStream stream) - : m_Stream(std::move(stream)), - m_Acceptor( - IoEngine::Get().GetIoContext() - ) - { - boost::asio::ip::tcp::endpoint ep{boost::asio::ip::address_v4::loopback(), 0}; - m_Acceptor.open(ep.protocol()); - m_Acceptor.bind(ep); - } - - unsigned short GetPort() { return m_Acceptor.local_endpoint().port(); } - - void Listen() - { - m_Acceptor.listen(); - } - - void Accept() - { - Listen(); - BOOST_REQUIRE_NO_THROW( - std::visit([&](auto& stream) { return m_Acceptor.accept(stream->lowest_layer()); }, m_Stream) - ); - } - - void Handshake() - { - BOOST_REQUIRE(std::holds_alternative::Ptr>(m_Stream)); - using handshake_type = UnbufferedAsioTlsStream::handshake_type; - auto& stream = std::get::Ptr>(m_Stream); - BOOST_REQUIRE_NO_THROW(stream->next_layer().handshake(handshake_type::server)); - BOOST_REQUIRE(stream->next_layer().IsVerifyOK()); - } - - void Shutdown() - { - BOOST_REQUIRE(std::holds_alternative::Ptr>(m_Stream)); - auto& stream = std::get::Ptr>(m_Stream); - try { - stream->next_layer().shutdown(); - } catch (const std::exception& ex) { - if (const auto* se = dynamic_cast(&ex); - !se || se->code() != boost::asio::error::eof) { - BOOST_FAIL("Exception in shutdown(): " << ex.what()); - } - } - - ResetStream(); - } - - void ResetStream() - { - if (std::holds_alternative::Ptr>(m_Stream)) { - m_Stream = Shared::Make(IoEngine::Get().GetIoContext(), *m_SslContext, "localhost"); - } else { - m_Stream = Shared::Make(IoEngine::Get().GetIoContext()); - } - } - - /** - * Reads the HTTP request body from the stream, with an optional limit on the number of bytes to read. - * - * @param bytes The maximum number of bytes to read from the request body. If 0, there is no limit and the entire body will be read. - * - * @return The HTTP request read from the stream. - */ - boost::beast::http::request GetRequest(std::size_t bytes = 0) - { - using namespace boost::beast; - - boost::beast::flat_buffer buf; - if (bytes > 0) { - buf = boost::beast::flat_buffer{bytes}; - } - boost::system::error_code ec; - http::request_parser parser; - parser.body_limit(-1); - std::visit( - [&](auto& stream) { - http::read(*stream, buf, parser, ec); - }, - m_Stream - ); - if (bytes > 0) { - BOOST_REQUIRE_MESSAGE( - !ec || ec == http::error::buffer_overflow, - "Reading request body with a buffer limit of '" << bytes << - "' should either succeed or fail with a buffer_overflow error, but got: " << ec.message() - ); - } else { - BOOST_REQUIRE_MESSAGE(!ec, "Error while reading request body: " << ec.message()); - BOOST_REQUIRE_MESSAGE(parser.is_done(), "Parser did not finish reading the request, but no error was set."); - } - return parser.release(); - } - - auto GetSplitRequestBody(char delim) - { - auto request = GetRequest(); - std::vector result{}; - boost::split(result, request.body(), boost::is_any_of(std::string{delim})); - return result; - } - - auto GetSplitDecodedRequestBody() - { - Array::Ptr result = new Array; - for (const auto& line : GetSplitRequestBody('\n')) { - if (!line.empty()) { - result->Add(JsonDecode(line)); - } - } - return result; - } - - template - std::string GetDataUntil(T&& delim) - { - using namespace boost::asio::ip; - - std::size_t delimLength{1}; - if constexpr (!std::is_same_v, char>) { - delimLength = std::string_view{delim}.size(); - } - - boost::asio::streambuf buf; - boost::system::error_code ec; - auto bytesRead = std::visit( - [&](auto& stream) { return boost::asio::read_until(*stream, buf, std::forward(delim), ec); }, m_Stream - ); - BOOST_REQUIRE_MESSAGE(!ec, ec.message()); - - std::string ret{ - boost::asio::buffers_begin(buf.data()), boost::asio::buffers_begin(buf.data()) + bytesRead - delimLength - }; - buf.consume(bytesRead); - - return ret; - } - - void SendResponse(boost::beast::http::status status = boost::beast::http::status::ok) - { - using namespace boost::asio::ip; - using namespace boost::beast; - - boost::system::error_code ec; - http::response response; - response.result(status); - response.prepare_payload(); - std::visit( - [&](auto& stream) { - http::write(*stream, response, ec); - BOOST_REQUIRE_MESSAGE(!ec, ec.message()); - stream->flush(ec); - BOOST_REQUIRE_MESSAGE(!ec, ec.message()); - }, - m_Stream - ); - } - -private: - AsioTlsOrTcpStream m_Stream; - boost::asio::ip::tcp::acceptor m_Acceptor; - Shared::Ptr m_SslContext; -}; - -} // namespace icinga diff --git a/test/perfdata-perfdatawriterconnection.cpp b/test/perfdata-perfdatawriterconnection.cpp deleted file mode 100644 index 16ed299a947..00000000000 --- a/test/perfdata-perfdatawriterconnection.cpp +++ /dev/null @@ -1,335 +0,0 @@ -// SPDX-FileCopyrightText: 2026 Icinga GmbH -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "perfdata/perfdatawriterconnection.hpp" -#include "test/perfdata-perfdatatargetfixture.hpp" -#include "test/remote-certificate-fixture.hpp" -#include "test/test-ctest.hpp" -#include "test/test-thread.hpp" -#include "test/utils.hpp" - -using namespace icinga; - -class TlsPerfdataWriterFixture : public CertificateFixture, public PerfdataWriterTargetFixture -{ -public: - TlsPerfdataWriterFixture() : PerfdataWriterTargetFixture(MakeContext("server")) - { - m_PdwSslContext = MakeContext("client"); - - m_Conn = new PerfdataWriterConnection{"Test", "test", "localhost", std::to_string(GetPort()), m_PdwSslContext}; - } - - auto& GetConnection() { return *m_Conn; } - - static inline const std::vector RequiredCerts{"client", "server"}; - -private: - Shared::Ptr MakeContext(const std::string& name) - { - auto testCert = GetCertFor(name); - return SetupSslContext( - testCert.crtFile, - testCert.keyFile, - m_CaCrtFile.string(), - "", - DEFAULT_TLS_CIPHERS, - DEFAULT_TLS_PROTOCOLMIN, - DebugInfo() - ); - } - - Shared::Ptr m_PdwSslContext; - PerfdataWriterConnection::Ptr m_Conn; -}; - -BOOST_FIXTURE_TEST_SUITE(perfdata_connection, TlsPerfdataWriterFixture, - *RequiresCertificate(TlsPerfdataWriterFixture::RequiredCerts) - *boost::unit_test::label("perfdata") - *boost::unit_test::label("network") -) - -/* If there is no acceptor listening on the other side, connecting should fail. - */ -BOOST_AUTO_TEST_CASE(connection_refused) -{ - std::promise p; - TestThread timeoutThread{[&]() { - auto f = p.get_future(); - GetConnection().CancelAfterTimeout(f, 50ms); - }}; - - BOOST_REQUIRE_THROW( - GetConnection().Send(boost::asio::const_buffer{"foobar", 7}), PerfdataWriterConnection::Stopped - ); - - REQUIRE_JOINS_WITHIN(timeoutThread, 1s); -} - -/* The PerfdataWriterConnection connects automatically when sending the first data. - * In case of http we also need to support disconnecting and reconnecting. - */ -BOOST_AUTO_TEST_CASE(ensure_connected) -{ - std::promise disconnectedPromise; - - TestThread mockTargetThread{[&]() { - Accept(); - Handshake(); - auto ret = GetDataUntil('\0'); - Shutdown(); - disconnectedPromise.get_future().get(); - BOOST_REQUIRE_EQUAL(ret, "foobar"); - }}; - - BOOST_REQUIRE_NO_THROW(GetConnection().Send(boost::asio::const_buffer{"foobar", 7})); - BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect()); - disconnectedPromise.set_value(); - - REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); -} - -/* Verify that data can still be sent while CancelAfterTimeout is waiting and the timeout - * can be aborted when all data has been sent successfully. - */ -BOOST_AUTO_TEST_CASE(finish_during_timeout) -{ - std::promise p; - - TestThread mockTargetThread{[&]() { - Accept(); - Handshake(); - auto ret = GetDataUntil('\0'); - BOOST_REQUIRE_EQUAL(ret, "foobar"); - ret = GetDataUntil('\0'); - BOOST_REQUIRE_EQUAL(ret, "foobar"); - // This is done here instead of the main thread after send, because we need to - // synchronize the asserts done in the timeoutThread after this point. - p.set_value(); - Shutdown(); - }}; - - GetConnection().Send(boost::asio::const_buffer{"foobar", 7}); - - TestThread timeoutThread{[&]() { - auto f = p.get_future(); - GetConnection().CancelAfterTimeout(f, 50ms); - BOOST_REQUIRE(f.wait_for(0ms) == std::future_status::ready); - BOOST_REQUIRE(!GetConnection().IsConnected()); - }}; - - GetConnection().Send(boost::asio::const_buffer{"foobar", 7}); - - REQUIRE_JOINS_WITHIN(timeoutThread, 1s); - REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); -} - -/* For the client, even a hanging server will accept the connection immediately, since it's done - * in the kernel. But in that case the TLS handshake will be stuck, so we need to verify that a - * handshake can be interrupted by CancelAfterTimeout(). - */ -BOOST_AUTO_TEST_CASE(stuck_in_handshake) -{ - std::promise p; - TestThread timeoutThread{[&]() { - Accept(); - auto f = p.get_future(); - GetConnection().CancelAfterTimeout(f, 50ms); - BOOST_REQUIRE(f.wait_for(0ms) == std::future_status::timeout); - }}; - - BOOST_REQUIRE_THROW( - GetConnection().Send(boost::asio::const_buffer{"foobar", 7}), PerfdataWriterConnection::Stopped - ); - - REQUIRE_JOINS_WITHIN(timeoutThread, 1s); -} - -/* When the disconnect timeout runs out while sending something to a slow or blocking server, we - * expect the send to be aborted after a timeout with an 'operation cancelled' exception, in - * order to not delay the shutdown of a perfdata writer indefinitely. - * No orderly TLS shutdown can be performed in this case, because the stream has been truncated. - * The server will need to handle this one on their own. - */ -BOOST_AUTO_TEST_CASE(stuck_sending) -{ - std::promise shutdownPromise; - std::promise dataReadPromise; - - TestThread mockTargetThread{[&]() { - Accept(); - Handshake(); - auto ret = GetDataUntil("#"); - BOOST_REQUIRE_EQUAL(ret, "foobar"); - dataReadPromise.set_value(); - - // There's still a full buffer waiting to be read, but we're pretending to be dead and - // close the socket at this point. - shutdownPromise.get_future().get(); - ResetStream(); - }}; - - TestThread timeoutThread{[&]() { - // Synchronize with when mockTargetThread has read the initial data. - // This should especially help with timing on slow machines like the ARM GHA runners. - dataReadPromise.get_future().get(); - BOOST_REQUIRE(GetConnection().IsConnected()); - BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect()); - BOOST_REQUIRE(!GetConnection().IsConnected()); - }}; - - // Allocate a large string that will fill the buffers on both sides of the connection, in - // order to make Send() block. - auto randomData = GetRandomString("foobar#", 4UL * 1024 * 1024); - auto buf = boost::asio::const_buffer{randomData.data(), randomData.size()}; - BOOST_REQUIRE_THROW(GetConnection().Send(buf), PerfdataWriterConnection::Stopped); - shutdownPromise.set_value(); - - REQUIRE_JOINS_WITHIN(timeoutThread, 1s); - REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); -} - -/* This simulates a server that is stuck after receiving a HTTP request and before sending their - * response. Here, the simulated server is polite and still responds to a shutdown request, but - * in reality a server might not even do that. That case should be handled by our - * AsioTlsStream::GracefulDisconnect() function with an additional 10s timeout. - */ -BOOST_AUTO_TEST_CASE(stuck_reading_response) -{ - std::promise shutdownPromise; - std::promise requestReadPromise; - - TestThread mockTargetThread{[&]() { - Accept(); - Handshake(); - auto ret = GetRequest(); - BOOST_REQUIRE_EQUAL(ret.body(), "bar"); - requestReadPromise.set_value(); - // Do not send a response but react to the shutdown to be polite. - shutdownPromise.get_future().get(); - Shutdown(); - }}; - - TestThread timeoutThread{[&]() { - // Synchronize with after mockTargetThread has read the request - requestReadPromise.get_future().get(); - BOOST_REQUIRE(GetConnection().IsConnected()); - BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect()); - BOOST_REQUIRE(!GetConnection().IsConnected()); - }}; - - boost::beast::http::request request; - request.body() = "bar"; - request.method(boost::beast::http::verb::get); - request.target("foo"); - request.prepare_payload(); - BOOST_REQUIRE_THROW(GetConnection().Send(request), PerfdataWriterConnection::Stopped); - shutdownPromise.set_value(); - - REQUIRE_JOINS_WITHIN(timeoutThread, 1s); - REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); -} - -/* This test simulates a server that closes the connection and reappears at a later time. - * PerfdataWriterConnection should detect the disconnect, catch the exception and attempt to - * reconnect without exiting Send(). - */ -BOOST_AUTO_TEST_CASE(reconnect_failed) -{ - TestThread mockTargetThread{[&]() { - Accept(); - Handshake(); - auto ret = GetDataUntil("#"); - BOOST_REQUIRE_EQUAL(ret, "foobar"); - - ResetStream(); - - Accept(); - Handshake(); - - ret = GetDataUntil("#"); - BOOST_REQUIRE_EQUAL(ret, "foobar"); - ret = GetDataUntil("\n"); - - Shutdown(); - }}; - - // Allocate a large string that will fill the buffers on both sides of the connection, in - // order to make Send() block. - auto randomData = GetRandomString("foobar#", 4UL * 1024 * 1024); - randomData.push_back('\n'); - BOOST_REQUIRE_NO_THROW(GetConnection().Send(boost::asio::const_buffer{randomData.data(), randomData.size()})); - BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect()); - - REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); -} - -/* This tests if retrying an http send will reproducibly lead to the exact same message being - * received. Normally this us guaranteed by the interface only accepting a const reference, but - * since on older boost versions the async_write() functions also accept non-const references, it - * doesn't hurt to ensure this with a test-case. - */ -BOOST_AUTO_TEST_CASE(http_send_retry) -{ - TestThread mockTargetThread{[&] { - Accept(); - Handshake(); - - /* Read only the first 512 bytes of the request body, since we don't want to unblock the client yet. - */ - auto request = GetRequest(512); - BOOST_REQUIRE_MESSAGE( - request.method() == boost::beast::http::verb::post, - "Request method is not POST: " << request.method_string() - ); - BOOST_REQUIRE_MESSAGE(request.target() == "foo", "Request target is not 'foo': " << request.target()); - BOOST_REQUIRE_MESSAGE( - request.body().compare(0, 7, "foobar#") == 0, - "Request body does not start with 'foobar#': " << request.body().substr(0, 7) - ); - - ResetStream(); - Accept(); - Handshake(); - - /* Read the entire response now and verify that we still get the expected body, - * even though the first read was only partial. - */ - request = GetRequest(); - BOOST_REQUIRE_MESSAGE( - request.method() == boost::beast::http::verb::post, - "Request method is not POST: " << request.method_string() - ); - BOOST_REQUIRE_MESSAGE(request.target() == "foo", "Request target is not 'foo': " << request.target()); - BOOST_REQUIRE_MESSAGE( - request.body().compare(0, 7, "foobar#") == 0, - "Request body does not start with 'foobar#': " << request.body().substr(0, 7) - ); - - /* The body size is 4MB + 7 bytes (7 bytes for the "foobar#" prefix of the generated message) - */ - BOOST_REQUIRE_MESSAGE( - request.body().size() == (4UL * 1024 * 1024) + 7, - "Request body is not the expected size: " << request.body().size() - ); - - SendResponse(); - - Shutdown(); - }}; - - boost::beast::http::request request{boost::beast::http::verb::post, "foo", 10}; - request.set(boost::beast::http::field::host, "localhost:" + std::to_string(GetPort())); - - /* Allocate a large string that will fill the buffers on both sides of the connection, in - * order to make Send() block. - */ - request.body() = GetRandomString("foobar#", 4UL * 1024 * 1024); - request.prepare_payload(); - BOOST_REQUIRE_NO_THROW(GetConnection().Send(request)); - BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect()); - - REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); -} - -BOOST_AUTO_TEST_SUITE_END() diff --git a/test/perfdata-perfdatawriterfixture.hpp b/test/perfdata-perfdatawriterfixture.hpp deleted file mode 100644 index e70b2123c84..00000000000 --- a/test/perfdata-perfdatawriterfixture.hpp +++ /dev/null @@ -1,139 +0,0 @@ -// SPDX-FileCopyrightText: 2026 Icinga GmbH -// SPDX-License-Identifier: GPL-3.0-or-later - -#pragma once - -#include -#include "base/perfdatavalue.hpp" -#include "config/configcompiler.hpp" -#include "config/configitem.hpp" -#include "icinga/host.hpp" -#include "test/base-testloggerfixture.hpp" -#include "test/perfdata-perfdatatargetfixture.hpp" -#include "test/utils.hpp" -#include - -namespace icinga { - -template -class PerfdataWriterFixture : public PerfdataWriterTargetFixture, public TestLoggerFixture -{ -public: - PerfdataWriterFixture() : m_Writer(new Writer) - { - auto createObjects = [&]() { - String config = R"CONFIG( -object CheckCommand "dummy" { - command = "/bin/echo" -} -object Host "h1" { - address = "h1" - check_command = "dummy" - enable_notifications = true - enable_active_checks = false - enable_passive_checks = true -} -)CONFIG"; - - std::unique_ptr expr = ConfigCompiler::CompileText("", config); - expr->Evaluate(*ScriptFrame::GetCurrentFrame()); - }; - - ConfigItem::RunWithActivationContext(new Function("CreateTestObjects", createObjects)); - - m_Host = Host::GetByName("h1"); - BOOST_REQUIRE(m_Host); - - m_Writer->SetPort(std::to_string(GetPort())); - m_Writer->SetName(m_Writer->GetReflectionType()->GetName()); - m_Writer->SetDisconnectTimeout(0.05); - m_Writer->Register(); - - auto hasFlushInterval = boost::hana::is_valid([](auto&& obj) -> decltype(obj.SetFlushInterval(0.05)) {}); - if constexpr (decltype(hasFlushInterval(std::declval()))::value) { - m_Writer->SetFlushInterval(0.05); - } - - auto hasFlushThreshold = boost::hana::is_valid([](auto&& obj) -> decltype(obj.SetFlushThreshold(1)) {}); - if constexpr (decltype(hasFlushThreshold(std::declval()))::value) { - m_Writer->SetFlushThreshold(1); - } - } - - void ReceiveCheckResults( - std::size_t num, - ServiceState state, - const std::function& fn = {} - ) - { - ::ReceiveCheckResults(m_Host, num, state, fn); - } - - std::size_t GetWorkQueueLength() - { - Array::Ptr dummy = new Array; - Dictionary::Ptr status = new Dictionary; - m_Writer->StatsFunc(status, dummy); - ObjectLock lock{status}; - // Unpack the single-key top-level dictionary - Dictionary::Ptr writer = status->Begin()->second; - BOOST_REQUIRE(writer); - Dictionary::Ptr values = writer->Get(m_Writer->GetName()); - BOOST_REQUIRE(values); - BOOST_REQUIRE(values->Contains("work_queue_items")); - return values->Get("work_queue_items"); - } - - /** - * Processes check results until the writer's work queue is no longer moving. - * - * @param timeout Time after which to give up trying to get the writer stuck - * @return true if the writer is now stuck - */ - bool GetWriterStuck(std::chrono::milliseconds timeout) - { - auto start = std::chrono::steady_clock::now(); - std::size_t unchangedCount = 0; - while(true){ - ReceiveCheckResults(10, ServiceCritical, [&](const CheckResult::Ptr& cr) { - cr->GetPerformanceData()->Add(new PerfdataValue{GetRandomString("", 4096), 1}); - }); - - if (std::chrono::steady_clock::now() - start >= timeout) { - return false; - } - - auto numWq = GetWorkQueueLength(); - if (numWq >= 10) { - std::this_thread::sleep_for(1ms); - if (numWq == GetWorkQueueLength()) { - if (unchangedCount < 5) { - ++unchangedCount; - continue; - } - return true; - } - - unchangedCount = 0; - } - } - } - - void ResumeWriter() - { - static_cast(m_Writer)->OnConfigLoaded(); - m_Writer->SetActive(true); - m_Writer->Activate(); - BOOST_REQUIRE(!m_Writer->IsPaused()); - } - - void PauseWriter() { static_cast(m_Writer)->Pause(); } - - auto GetWriter() { return m_Writer; } - -private: - Host::Ptr m_Host; - typename Writer::Ptr m_Writer; -}; - -} // namespace icinga diff --git a/test/remote-certificate-fixture.cpp b/test/remote-certificate-fixture.cpp index 7e02edb85d0..0d5a60d99e4 100644 --- a/test/remote-certificate-fixture.cpp +++ b/test/remote-certificate-fixture.cpp @@ -95,19 +95,19 @@ void RequiresCertificate::AddCaFixture(const String& caFixtureName) m_CaFixtures.emplace_back(caFixtureName); } -void RequiresCertificate::AddCertFixture(const String& name, const String& caFixture, const String& certFixture) +void RequiresCertificate::AddCertFixture(const String& cn, const String& caFixture, const String& certFixture) { auto& mts = boost::unit_test::framework::master_test_suite(); boost::unit_test::decorator::base_ptr certLabel{new boost::unit_test::label{"cert"}}; auto* setup = boost::unit_test::make_test_case( - [name]() { + [cn]() { CertificateFixture certFixture; auto persistentCertsPath = CertificateFixture::m_PersistentCertsDir / "certs"; - auto keyFile = persistentCertsPath / (name.GetData() + ".key"); - auto csrFile = persistentCertsPath / (name.GetData() + ".csr"); - auto crtFile = persistentCertsPath / (name.GetData() + ".crt"); - PkiUtility::NewCert("localhost", keyFile.string(), csrFile.string(), ""); + auto keyFile = persistentCertsPath / (cn.GetData() + ".key"); + auto csrFile = persistentCertsPath / (cn.GetData() + ".csr"); + auto crtFile = persistentCertsPath / (cn.GetData() + ".crt"); + PkiUtility::NewCert(cn, keyFile.string(), csrFile.string(), ""); PkiUtility::SignCsr(csrFile.string(), crtFile.string()); }, certFixture.GetData() + "_setup", diff --git a/test/remote-certificate-fixture.hpp b/test/remote-certificate-fixture.hpp index 4f854c899f7..f09a9b5c7c6 100644 --- a/test/remote-certificate-fixture.hpp +++ b/test/remote-certificate-fixture.hpp @@ -95,7 +95,7 @@ class RequiresCertificate : public CTestPropertiesBase static inline std::vector m_CaFixtures; static void AddCaFixture(const String& caFixtureName); - static void AddCertFixture(const String& name, const String& caFixture, const String& certFixture); + static void AddCertFixture(const String& cn, const String& caFixture, const String& certFixture); }; } // namespace icinga diff --git a/test/test-thread.hpp b/test/test-thread.hpp deleted file mode 100644 index f7838328253..00000000000 --- a/test/test-thread.hpp +++ /dev/null @@ -1,60 +0,0 @@ -// SPDX-FileCopyrightText: 2026 Icinga GmbH -// SPDX-License-Identifier: GPL-3.0-or-later - -#pragma once - -#include -#include -#include -#include - -#define REQUIRE_JOINS_WITHIN(t, timeout) \ - BOOST_REQUIRE_MESSAGE(t.TryJoinWithin(timeout), "Thread not joinable within timeout.") -#define CHECK_JOINS_WITHIN(t, timeout) \ - BOOST_REQUIRE_MESSAGE(t.TryJoinWithin(timeout), "Thread not joinable within timeout.") -#define TEST_JOINS_WITHIN(t, timeout) \ - BOOST_REQUIRE_MESSAGE(t.TryJoinWithin(timeout), "Thread not joinable within timeout.") - -#define REQUIRE_JOINABLE(t) BOOST_REQUIRE_MESSAGE(t.Joinable(), "Thread not joinable.") -#define CHECK_JOINABLE(t) BOOST_REQUIRE_MESSAGE(t.Joinable(), "Thread not joinable.") -#define TEST_JOINABLE(t) BOOST_REQUIRE_MESSAGE(t.Joinable(), "Thread not joinable.") - -namespace icinga { - -class TestThread -{ -public: - explicit TestThread(std::function fn) : TestThread(std::move(fn), std::promise{}) {} - - bool Joinable() - { - auto status = m_JoinFuture.wait_for(std::chrono::milliseconds{0}); - return status == std::future_status::ready; - } - - template - bool TryJoinWithin(std::chrono::duration timeout) - { - auto status = m_JoinFuture.wait_for(timeout); - if (status == std::future_status::ready) { - m_Thread.join(); - return true; - } - return false; - } - -private: - explicit TestThread(std::function fn, std::promise joinPromise) - : m_JoinFuture(joinPromise.get_future()), - m_Thread([fn = std::move(fn), jp = std::move(joinPromise)]() mutable { - fn(); - jp.set_value(); - }) - { - } - - std::future m_JoinFuture; - std::thread m_Thread; -}; - -} // namespace icinga diff --git a/test/utils.cpp b/test/utils.cpp index 95a9936cd4c..a0aba80d0f3 100644 --- a/test/utils.cpp +++ b/test/utils.cpp @@ -2,10 +2,8 @@ // SPDX-License-Identifier: GPL-2.0-or-later #include "utils.hpp" -#include "base/perfdatavalue.hpp" #include #include -#include #include #include @@ -68,58 +66,3 @@ GlobalTimezoneFixture::~GlobalTimezoneFixture() #endif tzset(); } - -std::string GetRandomString(std::string prefix, std::size_t length) -{ - std::random_device rd; - std::mt19937 gen(rd()); - std::uniform_int_distribution distribution('!', '~'); - - for (auto i = 0U; i < length; i++) { - prefix += static_cast(distribution(gen)); - } - - return prefix; -} - -/** - * Make our test host receive a number of check-results. - * - * @param num The number of check-results to receive - * @param state The state the check results should have - * @param fn A function that will be passed the current check-result - */ -void ReceiveCheckResults( - const icinga::Checkable::Ptr& host, - std::size_t num, - icinga::ServiceState state, - const std::function& fn -) -{ - using namespace icinga; - - StoppableWaitGroup::Ptr wg = new StoppableWaitGroup(); - - for (auto i = 0UL; i < num; ++i) { - CheckResult::Ptr cr = new CheckResult(); - - cr->SetState(state); - - double now = Utility::GetTime(); - cr->SetActive(false); - cr->SetScheduleStart(now); - cr->SetScheduleEnd(now); - cr->SetExecutionStart(now); - cr->SetExecutionEnd(now); - - Array::Ptr perfData = new Array; - perfData->Add(new PerfdataValue{"dummy", 42}); - cr->SetPerformanceData(perfData); - - if (fn) { - fn(cr); - } - - BOOST_REQUIRE(host->ProcessCheckResult(cr, wg) == Checkable::ProcessingResult::Ok); - } -} diff --git a/test/utils.hpp b/test/utils.hpp index ec8b245d401..67d2575a207 100644 --- a/test/utils.hpp +++ b/test/utils.hpp @@ -3,9 +3,7 @@ #pragma once -#include "icinga/host.hpp" #include -#include #include tm make_tm(std::string s); @@ -26,12 +24,3 @@ struct GlobalTimezoneFixture char *tz; }; - -std::string GetRandomString(std::string prefix, std::size_t length); - -void ReceiveCheckResults( - const icinga::Checkable::Ptr& host, - std::size_t num, - icinga::ServiceState state, - const std::function& fn = {} -);