diff --git a/src/StatsdClient/Bufferize/StatsBufferize.cs b/src/StatsdClient/Bufferize/AsynchronousBufferizedSender.cs similarity index 93% rename from src/StatsdClient/Bufferize/StatsBufferize.cs rename to src/StatsdClient/Bufferize/AsynchronousBufferizedSender.cs index 278b89ad..663d9a28 100644 --- a/src/StatsdClient/Bufferize/StatsBufferize.cs +++ b/src/StatsdClient/Bufferize/AsynchronousBufferizedSender.cs @@ -5,13 +5,13 @@ namespace StatsdClient.Bufferize { /// - /// StatsBufferize bufferizes metrics before sending them. + /// AsynchronousBufferizedSender bufferizes metrics before sending them. /// - internal class StatsBufferize : IDisposable + internal class AsynchronousBufferizedSender : IStatsSender { private readonly AsynchronousWorker _worker; - public StatsBufferize( + public AsynchronousBufferizedSender( StatsRouter statsRouter, int workerMaxItemCount, TimeSpan? blockingQueueTimeout, diff --git a/src/StatsdClient/Bufferize/IStatsSender.cs b/src/StatsdClient/Bufferize/IStatsSender.cs new file mode 100644 index 00000000..8c544ad7 --- /dev/null +++ b/src/StatsdClient/Bufferize/IStatsSender.cs @@ -0,0 +1,17 @@ +using System; +using StatsdClient.Statistic; + +namespace StatsdClient.Bufferize +{ + /// + /// IStatsSender defines the contract for sending stats to the pipeline. + /// + internal interface IStatsSender : IDisposable + { + bool TryDequeueFromPool(out Stats stats); + + void Send(Stats stats); + + void Flush(); + } +} diff --git a/src/StatsdClient/Bufferize/IStatsBufferizeFactory.cs b/src/StatsdClient/Bufferize/IStatsSenderFactory.cs similarity index 76% rename from src/StatsdClient/Bufferize/IStatsBufferizeFactory.cs rename to src/StatsdClient/Bufferize/IStatsSenderFactory.cs index 52667a3d..cb4a2a0a 100644 --- a/src/StatsdClient/Bufferize/IStatsBufferizeFactory.cs +++ b/src/StatsdClient/Bufferize/IStatsSenderFactory.cs @@ -7,12 +7,12 @@ namespace StatsdClient.Bufferize { /// - /// IStatsBufferizeFactory is a factory for StatsBufferize. - /// It is used to test StatsBufferize. + /// IStatsSenderFactory is a factory for creating IStatsSender instances. + /// It is used to test StatsdBuilder. /// - internal interface IStatsBufferizeFactory + internal interface IStatsSenderFactory { - StatsBufferize CreateStatsBufferize( + IStatsSender CreateAsynchronousBufferizedSender( StatsRouter statsRouter, int workerMaxItemCount, TimeSpan? blockingQueueTimeout, @@ -38,6 +38,7 @@ Telemetry CreateTelemetry( TimeSpan flushInterval, ITransport transport, string[] globalTags, - Action optionalExceptionHandler); + Action optionalExceptionHandler, + bool synchronousMode = false); } } \ No newline at end of file diff --git a/src/StatsdClient/Bufferize/StatsBufferizeFactory.cs b/src/StatsdClient/Bufferize/StatsSenderFactory.cs similarity index 85% rename from src/StatsdClient/Bufferize/StatsBufferizeFactory.cs rename to src/StatsdClient/Bufferize/StatsSenderFactory.cs index 4ad91585..3386f712 100644 --- a/src/StatsdClient/Bufferize/StatsBufferizeFactory.cs +++ b/src/StatsdClient/Bufferize/StatsSenderFactory.cs @@ -6,16 +6,16 @@ namespace StatsdClient.Bufferize { - internal class StatsBufferizeFactory : IStatsBufferizeFactory + internal class StatsSenderFactory : IStatsSenderFactory { - public StatsBufferize CreateStatsBufferize( + public IStatsSender CreateAsynchronousBufferizedSender( StatsRouter statsRouter, int workerMaxItemCount, TimeSpan? blockingQueueTimeout, TimeSpan maxIdleWaitBeforeSending, Action optionalExceptionHandler) { - return new StatsBufferize( + return new AsynchronousBufferizedSender( statsRouter, workerMaxItemCount, blockingQueueTimeout, @@ -49,9 +49,10 @@ public Telemetry CreateTelemetry( TimeSpan flushInterval, ITransport transport, string[] globalTags, - Action optionalExceptionHandler) + Action optionalExceptionHandler, + bool synchronousMode = false) { - return new Telemetry(metricSerializer, assemblyVersion, flushInterval, transport, globalTags, optionalExceptionHandler); + return new Telemetry(metricSerializer, assemblyVersion, flushInterval, transport, globalTags, optionalExceptionHandler, synchronousMode); } public ITransport CreateNamedPipeTransport(string pipeName) diff --git a/src/StatsdClient/Bufferize/SynchronousSender.cs b/src/StatsdClient/Bufferize/SynchronousSender.cs new file mode 100644 index 00000000..fcb0b6f2 --- /dev/null +++ b/src/StatsdClient/Bufferize/SynchronousSender.cs @@ -0,0 +1,86 @@ +using System; +using StatsdClient.Statistic; + +namespace StatsdClient.Bufferize +{ + /// + /// SynchronousSender sends metrics synchronously on the calling thread. + /// This is intended for serverless environments (e.g., AWS Lambda) where + /// background threads may be frozen between invocations. + /// + internal class SynchronousSender : IStatsSender + { + [ThreadStatic] + private static Stats _threadLocalStats; + + private readonly StatsRouter _statsRouter; + private readonly Action _optionalExceptionHandler; + private readonly object _lock = new object(); + + public SynchronousSender(StatsRouter statsRouter, Action optionalExceptionHandler = null) + { + _statsRouter = statsRouter ?? throw new ArgumentNullException(nameof(statsRouter)); + _optionalExceptionHandler = optionalExceptionHandler; + } + + public bool TryDequeueFromPool(out Stats stats) + { + if (_threadLocalStats == null) + { + _threadLocalStats = new Stats(); + } + + stats = _threadLocalStats; + return true; + } + + public void Send(Stats stats) + { + try + { + lock (_lock) + { + _statsRouter.Route(stats); + } + } + catch (Exception e) + { + if (_optionalExceptionHandler != null) + { + _optionalExceptionHandler.Invoke(e); + } + else + { + throw; + } + } + } + + public void Flush() + { + try + { + lock (_lock) + { + _statsRouter.Flush(); + } + } + catch (Exception e) + { + if (_optionalExceptionHandler != null) + { + _optionalExceptionHandler.Invoke(e); + } + else + { + throw; + } + } + } + + public void Dispose() + { + Flush(); + } + } +} diff --git a/src/StatsdClient/DogStatsdService.cs b/src/StatsdClient/DogStatsdService.cs index 79c11edb..712e2ffe 100644 --- a/src/StatsdClient/DogStatsdService.cs +++ b/src/StatsdClient/DogStatsdService.cs @@ -11,7 +11,7 @@ namespace StatsdClient /// public class DogStatsdService : IDogStatsd, IDisposable { - private StatsdBuilder _statsdBuilder = new StatsdBuilder(new StatsBufferizeFactory()); + private StatsdBuilder _statsdBuilder = new StatsdBuilder(new StatsSenderFactory()); private MetricsSender _metricsSender; private StatsdData _statsdData; private StatsdConfig _config; diff --git a/src/StatsdClient/MetricsSender.cs b/src/StatsdClient/MetricsSender.cs index 68c8e195..f7afb775 100644 --- a/src/StatsdClient/MetricsSender.cs +++ b/src/StatsdClient/MetricsSender.cs @@ -7,14 +7,14 @@ namespace StatsdClient internal class MetricsSender { private readonly Telemetry _optionalTelemetry; - private readonly StatsBufferize _statsBufferize; + private readonly IStatsSender _statsSender; private readonly bool _truncateIfTooLong; private readonly IStopWatchFactory _stopwatchFactory; private readonly IRandomGenerator _randomGenerator; private readonly Cardinality? _defaultCardinality; internal MetricsSender( - StatsBufferize statsBufferize, + IStatsSender statsSender, IRandomGenerator randomGenerator, IStopWatchFactory stopwatchFactory, Telemetry optionalTelemetry, @@ -22,7 +22,7 @@ internal MetricsSender( Cardinality? defaultCardinality = null) { _stopwatchFactory = stopwatchFactory; - _statsBufferize = statsBufferize; + _statsSender = statsSender; _randomGenerator = randomGenerator; _optionalTelemetry = optionalTelemetry; _truncateIfTooLong = truncateIfTooLong; @@ -142,7 +142,7 @@ public void Send(Action actionToTime, string statName, double sampleRate = 1.0, private bool TryDequeueStats(out Stats stats) { - if (_statsBufferize.TryDequeueFromPool(out stats)) + if (_statsSender.TryDequeueFromPool(out stats)) { return true; } @@ -151,6 +151,6 @@ private bool TryDequeueStats(out Stats stats) return false; } - private void Send(Stats metricFields) => _statsBufferize.Send(metricFields); + private void Send(Stats metricFields) => _statsSender.Send(metricFields); } } diff --git a/src/StatsdClient/StatsdBuilder.cs b/src/StatsdClient/StatsdBuilder.cs index f7aa087b..a4d89327 100644 --- a/src/StatsdClient/StatsdBuilder.cs +++ b/src/StatsdClient/StatsdBuilder.cs @@ -16,9 +16,9 @@ internal class StatsdBuilder public static readonly string UnixDomainSocketPrefix = "unix://"; private const string _entityIdInternalTagKey = "dd.internal.entity_id"; - private readonly IStatsBufferizeFactory _factory; + private readonly IStatsSenderFactory _factory; - public StatsdBuilder(IStatsBufferizeFactory factory) + public StatsdBuilder(IStatsSenderFactory factory) { _factory = factory; } @@ -32,24 +32,25 @@ public StatsdData BuildStatsData(StatsdConfig config, Action optional var originDetectionEnabled = IsOriginDetectionEnabled(config); var serializers = CreateSerializers(config.Prefix, globalTags, config.Advanced.MaxMetricsInAsyncQueue, originDetectionEnabled, config.ContainerID); - var telemetry = CreateTelemetry(serializers.MetricSerializer, config, globalTags, endPoint, transportData.Transport, optionalExceptionHandler); - var statsBufferize = CreateStatsBufferize( + var telemetry = CreateTelemetry(serializers.MetricSerializer, config, globalTags, endPoint, transportData.Transport, optionalExceptionHandler, config.SynchronousMode); + var statsSender = CreateStatsSender( telemetry, transportData.Transport, transportData.BufferCapacity, config.Advanced, serializers, config.ClientSideAggregation, + config.SynchronousMode, optionalExceptionHandler); var metricsSender = new MetricsSender( - statsBufferize, + statsSender, new RandomGenerator(), new StopWatchFactory(), telemetry, config.StatsdTruncateIfTooLong, config.Cardinality); - return new StatsdData(metricsSender, statsBufferize, transport, telemetry); + return new StatsdData(metricsSender, statsSender, transport, telemetry); } private static bool IsOriginDetectionEnabled(StatsdConfig config) @@ -179,7 +180,8 @@ private Telemetry CreateTelemetry( string[] globalTags, DogStatsdEndPoint dogStatsdEndPoint, ITransport transport, - Action optionalExceptionHandler) + Action optionalExceptionHandler, + bool synchronousMode = false) { var telemetryFlush = config.Advanced.TelemetryFlushInterval; @@ -194,7 +196,7 @@ private Telemetry CreateTelemetry( telemetryTransport = CreateTransport(optionalTelemetryEndPoint, config); } - return _factory.CreateTelemetry(metricSerializer, version, telemetryFlush.Value, telemetryTransport, globalTags, optionalExceptionHandler); + return _factory.CreateTelemetry(metricSerializer, version, telemetryFlush.Value, telemetryTransport, globalTags, optionalExceptionHandler, synchronousMode); } // Telemetry is not enabled @@ -244,13 +246,14 @@ private TransportData CreateTransportData(DogStatsdEndPoint endPoint, StatsdConf return transportData; } - private StatsBufferize CreateStatsBufferize( + private IStatsSender CreateStatsSender( Telemetry telemetry, ITransport transport, int bufferCapacity, AdvancedStatsConfig config, Serializers serializers, ClientSideAggregationConfig optionalClientSideAggregationConfig, + bool synchronousMode, Action optionalExceptionHandler) { var bufferHandler = new BufferBuilderHandler(telemetry, transport); @@ -276,14 +279,17 @@ private StatsBufferize CreateStatsBufferize( var statsRouter = _factory.CreateStatsRouter(serializers, bufferBuilder, optionalAggregators); - var statsBufferize = _factory.CreateStatsBufferize( + if (synchronousMode) + { + return new SynchronousSender(statsRouter, optionalExceptionHandler); + } + + return _factory.CreateAsynchronousBufferizedSender( statsRouter, config.MaxMetricsInAsyncQueue, config.MaxBlockDuration, config.DurationBeforeSendingNotFullBuffer, optionalExceptionHandler); - - return statsBufferize; } private ITransport CreateUDPTransport(DogStatsdEndPoint endPoint) diff --git a/src/StatsdClient/StatsdConfig.cs b/src/StatsdClient/StatsdConfig.cs index 3a8c76a3..3e861fe4 100644 --- a/src/StatsdClient/StatsdConfig.cs +++ b/src/StatsdClient/StatsdConfig.cs @@ -160,5 +160,15 @@ public StatsdConfig() /// This value is used when no cardinality is explicitly specified. /// public Cardinality? Cardinality { get; set; } + + /// + /// Gets or sets a value indicating whether the client operates in synchronous mode. + /// When enabled, metrics are sent directly on the calling thread without background + /// processing. This is recommended for serverless environments (e.g., AWS Lambda) + /// where background threads may be frozen between invocations. + /// Call Flush() at the end of each invocation to ensure all buffered metrics are sent. + /// Default is false. + /// + public bool SynchronousMode { get; set; } = false; } } diff --git a/src/StatsdClient/StatsdData.cs b/src/StatsdClient/StatsdData.cs index f8ba7610..5011cea6 100644 --- a/src/StatsdClient/StatsdData.cs +++ b/src/StatsdClient/StatsdData.cs @@ -7,17 +7,17 @@ namespace StatsdClient internal class StatsdData : IDisposable { private ITransport _transport; - private StatsBufferize _statsBufferize; + private IStatsSender _statsSender; public StatsdData( MetricsSender metricsSender, - StatsBufferize statsBufferize, + IStatsSender statsSender, ITransport transport, Telemetry telemetry) { MetricsSender = metricsSender; Telemetry = telemetry; - _statsBufferize = statsBufferize; + _statsSender = statsSender; _transport = transport; } @@ -27,7 +27,7 @@ public StatsdData( public void Flush(bool flushTelemetry) { - _statsBufferize?.Flush(); + _statsSender?.Flush(); if (flushTelemetry) { Telemetry.Flush(); @@ -36,14 +36,14 @@ public void Flush(bool flushTelemetry) public void Dispose() { - // _statsBufferize and _telemetry must be disposed before _statsSender to make + // _statsSender and _telemetry must be disposed before _statsSender to make // sure _statsSender does not receive data when it is already disposed. Telemetry?.Dispose(); Telemetry = null; - _statsBufferize?.Dispose(); - _statsBufferize = null; + _statsSender?.Dispose(); + _statsSender = null; _transport?.Dispose(); _transport = null; diff --git a/src/StatsdClient/Telemetry.cs b/src/StatsdClient/Telemetry.cs index 15e55040..3366b1db 100644 --- a/src/StatsdClient/Telemetry.cs +++ b/src/StatsdClient/Telemetry.cs @@ -40,7 +40,8 @@ public Telemetry( TimeSpan flushInterval, ITransport transport, string[] globalTags, - Action optionalExceptionHandler) + Action optionalExceptionHandler, + bool synchronousMode = false) { _optionalMetricSerializer = metricSerializer; _optionalTransport = transport; @@ -53,11 +54,14 @@ public Telemetry( _aggregatedContexts.Add(MetricType.Count, new ValueWithTags(_optionalTags, "metrics_type:count")); _aggregatedContexts.Add(MetricType.Set, new ValueWithTags(_optionalTags, "metrics_type:set")); _optionalExceptionHandler = optionalExceptionHandler; - _optionalTimer = new Timer( - _ => Flush(), - null, - flushInterval, - flushInterval); + if (!synchronousMode) + { + _optionalTimer = new Timer( + _ => Flush(), + null, + flushInterval, + flushInterval); + } } public static string MetricsMetricName => _telemetryPrefix + "metrics"; diff --git a/tests/StatsdClient.Tests/Bufferize/StatsBufferizeTests.cs b/tests/StatsdClient.Tests/Bufferize/AsynchronousBufferizedSenderTests.cs similarity index 82% rename from tests/StatsdClient.Tests/Bufferize/StatsBufferizeTests.cs rename to tests/StatsdClient.Tests/Bufferize/AsynchronousBufferizedSenderTests.cs index 964c6843..29c6075a 100644 --- a/tests/StatsdClient.Tests/Bufferize/StatsBufferizeTests.cs +++ b/tests/StatsdClient.Tests/Bufferize/AsynchronousBufferizedSenderTests.cs @@ -12,11 +12,11 @@ namespace Tests { [TestFixture] - public class StatsBufferizeTests + public class AsynchronousBufferizedSenderTests { [Test] [Timeout(10000)] - public void StatsBufferize() + public void AsynchronousBufferizedSender() { var handler = new BufferBuilderHandlerMock(); var bufferBuilder = new BufferBuilder(handler, 30, "\n", Tools.ExceptionHandler); @@ -25,13 +25,13 @@ public void StatsBufferize() EventSerializer = new EventSerializer(new SerializerHelper(null, null)), }; var statsRouter = new StatsRouter(serializers, bufferBuilder, null); - using (var statsBufferize = new StatsBufferize(statsRouter, 10, null, TimeSpan.Zero, Tools.ExceptionHandler)) + using (var sender = new AsynchronousBufferizedSender(statsRouter, 10, null, TimeSpan.Zero, Tools.ExceptionHandler)) { var stats = new Stats { Kind = StatsKind.Event }; stats.Event.Text = "test"; stats.Event.Title = "title"; - statsBufferize.Send(stats); + sender.Send(stats); while (handler.Buffer == null) { Task.Delay(TimeSpan.FromMilliseconds(1)).Wait(); diff --git a/tests/StatsdClient.Tests/Bufferize/SynchronousSenderTests.cs b/tests/StatsdClient.Tests/Bufferize/SynchronousSenderTests.cs new file mode 100644 index 00000000..8e0e611c --- /dev/null +++ b/tests/StatsdClient.Tests/Bufferize/SynchronousSenderTests.cs @@ -0,0 +1,349 @@ +using System; +using System.Collections.Concurrent; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; +using StatsdClient; +using StatsdClient.Bufferize; +using StatsdClient.Statistic; +using Tests.Utils; + +namespace Tests +{ + [TestFixture] + public class SynchronousSenderTests + { + [Test] + public void TryDequeueFromPoolAlwaysSucceeds() + { + var handler = new BufferBuilderHandlerMock(); + var bufferBuilder = new BufferBuilder(handler, 1024, "\n", Tools.ExceptionHandler); + var serializers = new Serializers + { + MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty), + }; + var statsRouter = new StatsRouter(serializers, bufferBuilder, null); + + using (var sender = new SynchronousSender(statsRouter)) + { + for (int i = 0; i < 100; i++) + { + Assert.IsTrue(sender.TryDequeueFromPool(out var stats)); + Assert.IsNotNull(stats); + } + } + } + + [Test] + public void SendAndFlushDeliversMetrics() + { + var handler = new BufferBuilderHandlerMock(); + var bufferBuilder = new BufferBuilder(handler, 1024, "\n", Tools.ExceptionHandler); + var serializers = new Serializers + { + MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty), + }; + var statsRouter = new StatsRouter(serializers, bufferBuilder, null); + + using (var sender = new SynchronousSender(statsRouter)) + { + sender.TryDequeueFromPool(out var stats); + stats.Kind = StatsKind.Metric; + stats.Metric.MetricType = MetricType.Count; + stats.Metric.StatName = "test.counter"; + stats.Metric.NumericValue = 42; + stats.Metric.SampleRate = 1.0; + stats.Metric.Tags = null; + + sender.Send(stats); + + // Buffer hasn't been flushed yet (not full) + Assert.IsNull(handler.Buffer); + + sender.Flush(); + + // After flush, data should be delivered + Assert.IsNotNull(handler.Buffer); + Assert.AreEqual("test.counter:42|c\n", handler.BufferToString()); + } + } + + [Test] + public void MultipleMetricsBatchInBuffer() + { + var handler = new BufferBuilderHandlerMock(); + var bufferBuilder = new BufferBuilder(handler, 1024, "\n", Tools.ExceptionHandler); + var serializers = new Serializers + { + MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty), + }; + var statsRouter = new StatsRouter(serializers, bufferBuilder, null); + + using (var sender = new SynchronousSender(statsRouter)) + { + sender.TryDequeueFromPool(out var stats); + + stats.Kind = StatsKind.Metric; + stats.Metric.MetricType = MetricType.Count; + stats.Metric.StatName = "counter1"; + stats.Metric.NumericValue = 1; + stats.Metric.SampleRate = 1.0; + stats.Metric.Tags = null; + sender.Send(stats); + + stats.Metric.StatName = "counter2"; + stats.Metric.NumericValue = 2; + sender.Send(stats); + + // Nothing sent yet + Assert.IsNull(handler.Buffer); + + sender.Flush(); + + // Both metrics in a single packet + Assert.IsNotNull(handler.Buffer); + var result = handler.BufferToString(); + Assert.That(result, Does.Contain("counter1:1|c")); + Assert.That(result, Does.Contain("counter2:2|c")); + } + } + + [Test] + public void BufferAutoFlushesWhenFull() + { + var handler = new BufferBuilderHandlerMock(); + + // Small buffer capacity: first metric fits, second triggers auto-flush of the first + var bufferBuilder = new BufferBuilder(handler, 30, "\n", Tools.ExceptionHandler); + var serializers = new Serializers + { + MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty), + }; + var statsRouter = new StatsRouter(serializers, bufferBuilder, null); + + using (var sender = new SynchronousSender(statsRouter)) + { + sender.TryDequeueFromPool(out var stats); + stats.Kind = StatsKind.Metric; + stats.Metric.MetricType = MetricType.Count; + stats.Metric.StatName = "first.metric"; + stats.Metric.NumericValue = 1; + stats.Metric.SampleRate = 1.0; + stats.Metric.Tags = null; + sender.Send(stats); + + // First metric fits in buffer, nothing flushed yet + Assert.IsNull(handler.Buffer); + + // Second metric won't fit alongside the first, triggering auto-flush + stats.Metric.StatName = "second.metric"; + stats.Metric.NumericValue = 2; + sender.Send(stats); + + // First metric should have been auto-flushed + Assert.IsNotNull(handler.Buffer); + Assert.AreEqual("first.metric:1|c\n", handler.BufferToString()); + } + } + + [Test] + public void DisposeTriggersFlush() + { + var handler = new BufferBuilderHandlerMock(); + var bufferBuilder = new BufferBuilder(handler, 1024, "\n", Tools.ExceptionHandler); + var serializers = new Serializers + { + MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty), + }; + var statsRouter = new StatsRouter(serializers, bufferBuilder, null); + + var sender = new SynchronousSender(statsRouter); + + sender.TryDequeueFromPool(out var stats); + stats.Kind = StatsKind.Metric; + stats.Metric.MetricType = MetricType.Gauge; + stats.Metric.StatName = "test.gauge"; + stats.Metric.NumericValue = 100; + stats.Metric.SampleRate = 1.0; + stats.Metric.Tags = null; + sender.Send(stats); + + Assert.IsNull(handler.Buffer); + + sender.Dispose(); + + Assert.IsNotNull(handler.Buffer); + Assert.AreEqual("test.gauge:100|g\n", handler.BufferToString()); + } + + [Test] + public void ThreadSafety() + { + var handler = new ConcurrentBufferBuilderHandler(); + var bufferBuilder = new BufferBuilder(handler, 1024, "\n", Tools.ExceptionHandler); + var serializers = new Serializers + { + MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty), + }; + var statsRouter = new StatsRouter(serializers, bufferBuilder, null); + + using (var sender = new SynchronousSender(statsRouter)) + { + const int threadCount = 10; + const int metricsPerThread = 100; + var barrier = new Barrier(threadCount); + var tasks = new Task[threadCount]; + + for (int t = 0; t < threadCount; t++) + { + int threadIndex = t; + tasks[t] = Task.Run(() => + { + barrier.SignalAndWait(); + for (int i = 0; i < metricsPerThread; i++) + { + sender.TryDequeueFromPool(out var stats); + stats.Kind = StatsKind.Metric; + stats.Metric.MetricType = MetricType.Count; + stats.Metric.StatName = $"thread{threadIndex}.counter"; + stats.Metric.NumericValue = 1; + stats.Metric.SampleRate = 1.0; + stats.Metric.Tags = null; + sender.Send(stats); + } + }); + } + + Task.WaitAll(tasks); + sender.Flush(); + + // Verify no exceptions were thrown and all data was captured + var allData = handler.GetAllData(); + Assert.IsNotEmpty(allData); + + // Verify each thread's metrics appear in the output + for (int t = 0; t < threadCount; t++) + { + Assert.That(allData, Does.Contain($"thread{t}.counter:1|c")); + } + } + } + + [Test] + public void FlushWhenEmptyDoesNotThrow() + { + var handler = new BufferBuilderHandlerMock(); + var bufferBuilder = new BufferBuilder(handler, 1024, "\n", Tools.ExceptionHandler); + var serializers = new Serializers + { + MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty), + }; + var statsRouter = new StatsRouter(serializers, bufferBuilder, null); + + using (var sender = new SynchronousSender(statsRouter)) + { + Assert.DoesNotThrow(() => sender.Flush()); + Assert.DoesNotThrow(() => sender.Flush()); + } + } + + [Test] + public void ExceptionsForwardedToHandler() + { + var throwingHandler = new ThrowingBufferBuilderHandler(); + var bufferBuilder = new BufferBuilder(throwingHandler, 20, "\n", null); + var serializers = new Serializers + { + MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty), + }; + var statsRouter = new StatsRouter(serializers, bufferBuilder, null); + + Exception caughtException = null; + Action exceptionHandler = e => caughtException = e; + + using (var sender = new SynchronousSender(statsRouter, exceptionHandler)) + { + sender.TryDequeueFromPool(out var stats); + stats.Kind = StatsKind.Metric; + stats.Metric.MetricType = MetricType.Count; + stats.Metric.StatName = "will.fail"; + stats.Metric.NumericValue = 1; + stats.Metric.SampleRate = 1.0; + stats.Metric.Tags = null; + + // Send enough data to trigger a buffer flush (buffer is 20 bytes, metric is ~15) + sender.Send(stats); + sender.Send(stats); + + // Exception should have been caught by the handler, not propagated + Assert.IsNotNull(caughtException); + Assert.That(caughtException.Message, Does.Contain("Transport error")); + } + } + + [Test] + public void ExceptionsRethrowWhenNoHandler() + { + var throwingHandler = new ThrowingBufferBuilderHandler(); + var bufferBuilder = new BufferBuilder(throwingHandler, 20, "\n", null); + var serializers = new Serializers + { + MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty), + }; + var statsRouter = new StatsRouter(serializers, bufferBuilder, null); + + var sender = new SynchronousSender(statsRouter); + + sender.TryDequeueFromPool(out var stats); + stats.Kind = StatsKind.Metric; + stats.Metric.MetricType = MetricType.Count; + stats.Metric.StatName = "will.fail"; + stats.Metric.NumericValue = 1; + stats.Metric.SampleRate = 1.0; + stats.Metric.Tags = null; + + // First send buffers the metric; second triggers a flush which throws + sender.Send(stats); + var ex = Assert.Throws(() => sender.Send(stats)); + Assert.That(ex.Message, Does.Contain("Transport error")); + } + + /// + /// Thread-safe handler that accumulates all buffers for verification. + /// + private class ConcurrentBufferBuilderHandler : IBufferBuilderHandler + { + private readonly ConcurrentBag _buffers = new ConcurrentBag(); + + public void Handle(byte[] buffer, int length) + { + var data = new byte[length]; + Array.Copy(buffer, data, length); + _buffers.Add(Encoding.UTF8.GetString(data)); + } + + public string GetAllData() + { + var sb = new StringBuilder(); + foreach (var buf in _buffers) + { + sb.Append(buf); + } + + return sb.ToString(); + } + } + + /// + /// Handler that throws on Handle to simulate transport failures. + /// + private class ThrowingBufferBuilderHandler : IBufferBuilderHandler + { + public void Handle(byte[] buffer, int length) + { + throw new InvalidOperationException("Transport error"); + } + } + } +} diff --git a/tests/StatsdClient.Tests/StatsdBuilderTests.cs b/tests/StatsdClient.Tests/StatsdBuilderTests.cs index a5a00af0..09da0ba7 100644 --- a/tests/StatsdClient.Tests/StatsdBuilderTests.cs +++ b/tests/StatsdClient.Tests/StatsdBuilderTests.cs @@ -28,7 +28,7 @@ public class StatsdBuilderTests StatsdConfig.VersionEnvVar, }; - private Mock _mock; + private Mock _mock; private StatsdBuilder _statsdBuilder; private UnixEndPoint _unixEndPoint; private IPEndPoint _ipEndPoint; @@ -36,7 +36,7 @@ public class StatsdBuilderTests [SetUp] public void Init() { - _mock = new Mock(MockBehavior.Loose); + _mock = new Mock(MockBehavior.Loose); _statsdBuilder = new StatsdBuilder(_mock.Object); _mock.Setup(m => m.CreateUnixDomainSocketTransport( It.IsAny(), @@ -56,6 +56,13 @@ public void Init() }); _ipEndPoint = null; + _mock.Setup(m => m.CreateStatsRouter( + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns( + (s, b, a) => new StatsRouter(s, b, a)); + foreach (var key in _envVarsKeyToRestore) { _envVarsToRestore[key] = Environment.GetEnvironmentVariable(key); @@ -122,7 +129,7 @@ public void UDSStatsdServerName() } [Test] - public void CreateStatsBufferizeUDP() + public void CreateAsynchronousBufferizedSenderUDP() { var config = new StatsdConfig { }; var conf = config.Advanced; @@ -134,7 +141,7 @@ public void CreateStatsBufferizeUDP() conf.DurationBeforeSendingNotFullBuffer = TimeSpan.FromMilliseconds(4); BuildStatsData(config); - _mock.Verify(m => m.CreateStatsBufferize( + _mock.Verify(m => m.CreateAsynchronousBufferizedSender( It.IsAny(), conf.MaxMetricsInAsyncQueue, conf.MaxBlockDuration, @@ -148,7 +155,7 @@ public void CreateStatsBufferizeUDP() } [Test] - public void CreateStatsBufferizeUDS() + public void CreateAsynchronousBufferizedSenderUDS() { // Skip on Windows if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) @@ -160,7 +167,7 @@ public void CreateStatsBufferizeUDS() config.StatsdMaxUnixDomainSocketPacketSize = 20; BuildStatsData(config); - _mock.Verify(m => m.CreateStatsBufferize( + _mock.Verify(m => m.CreateAsynchronousBufferizedSender( It.IsAny(), It.IsAny(), null, @@ -202,7 +209,8 @@ public void CreateTelemetry() conf.TelemetryFlushInterval.Value, It.IsAny(), It.Is(tags => Enumerable.SequenceEqual(tags, expectedTags)), - Tools.ExceptionHandler)); + Tools.ExceptionHandler, + false)); } [Test] @@ -237,6 +245,56 @@ public void ClientSideAggregation() It.IsNotNull())); } + [Test] + public void SynchronousModeSkipsAsyncBufferize() + { + var config = new StatsdConfig { }; + config.SynchronousMode = true; + config.Advanced.TelemetryFlushInterval = null; + + BuildStatsData(config); + + // StatsRouter should still be created + _mock.Verify( + m => m.CreateStatsRouter( + It.IsAny(), + It.IsAny(), + It.IsAny()), + Times.Once); + + // CreateAsynchronousBufferizedSender should NOT be called in synchronous mode + _mock.Verify( + m => m.CreateAsynchronousBufferizedSender( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny>()), + Times.Never); + } + + [Test] + public void SynchronousModeTelemetry() + { + var config = new StatsdConfig { }; + config.SynchronousMode = true; + config.Advanced.TelemetryFlushInterval = TimeSpan.FromMinutes(1); + + BuildStatsData(config); + + // Telemetry should be created with synchronousMode = true + _mock.Verify( + m => m.CreateTelemetry( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny>(), + true), + Times.Once); + } + [Test] public void PipeName() { diff --git a/tests/StatsdClient.Tests/SynchronousModeIntegrationTests.cs b/tests/StatsdClient.Tests/SynchronousModeIntegrationTests.cs new file mode 100644 index 00000000..eb9307f3 --- /dev/null +++ b/tests/StatsdClient.Tests/SynchronousModeIntegrationTests.cs @@ -0,0 +1,152 @@ +using System; +using System.Threading; +using NUnit.Framework; +using StatsdClient; +using Tests.Helpers; + +namespace Tests +{ + [TestFixture] + public class SynchronousModeIntegrationTests + { + private readonly int _serverPort = Convert.ToInt32("8127"); + private UdpListener _udpListener; + private Thread _listenThread; + private string _serverName = "127.0.0.1"; + private DogStatsdService _dogStatsdService; + + [OneTimeSetUp] + public void SetupListener() + { + _udpListener = new UdpListener(_serverName, _serverPort); + } + + [OneTimeTearDown] + public void TearDownUdpListener() + { + _udpListener.Dispose(); + } + + [SetUp] + public void Setup() + { + _listenThread = new Thread(new ParameterizedThreadStart(_udpListener.Listen)); + _listenThread.Start(); + + var config = new StatsdConfig + { + StatsdServerName = _serverName, + StatsdPort = _serverPort, + SynchronousMode = true, + OriginDetection = false, + }; + config.ClientSideAggregation = null; + config.Advanced.TelemetryFlushInterval = null; + _dogStatsdService = new DogStatsdService(); + _dogStatsdService.Configure(config); + } + + [TearDown] + public void Cleanup() + { + _udpListener.GetAndClearLastMessages(); + _dogStatsdService.Dispose(); + } + + [Test] + public void Counter() + { + _dogStatsdService.Counter("sync.counter", 42); + _dogStatsdService.Flush(); + AssertWasReceived("sync.counter:42|c"); + } + + [Test] + public void Gauge() + { + _dogStatsdService.Gauge("sync.gauge", 100.5); + _dogStatsdService.Flush(); + AssertWasReceived("sync.gauge:100.5|g"); + } + + [Test] + public void Histogram() + { + _dogStatsdService.Histogram("sync.histogram", 250); + _dogStatsdService.Flush(); + AssertWasReceived("sync.histogram:250|h"); + } + + [Test] + public void Distribution() + { + _dogStatsdService.Distribution("sync.distribution", 30); + _dogStatsdService.Flush(); + AssertWasReceived("sync.distribution:30|d"); + } + + [Test] + public void Timer() + { + _dogStatsdService.Timer("sync.timer", 999); + _dogStatsdService.Flush(); + AssertWasReceived("sync.timer:999|ms"); + } + + [Test] + public void Set() + { + _dogStatsdService.Set("sync.set", "user123"); + _dogStatsdService.Flush(); + AssertWasReceived("sync.set:user123|s"); + } + + [Test] + public void CounterWithTags() + { + _dogStatsdService.Counter("sync.counter", 1, tags: new[] { "env:prod", "region:us" }); + _dogStatsdService.Flush(); + AssertWasReceived("sync.counter:1|c|#env:prod,region:us"); + } + + [Test] + public void Event() + { + _dogStatsdService.Event("Title", "Text"); + _dogStatsdService.Flush(); + AssertWasReceived("_e{5,4}:Title|Text"); + } + + [Test] + public void ServiceCheck() + { + _dogStatsdService.ServiceCheck("my.check", Status.OK); + _dogStatsdService.Flush(); + AssertWasReceived("_sc|my.check|0"); + } + + [Test] + public void DisposeFlushesRemainingMetrics() + { + _dogStatsdService.Counter("sync.dispose.counter", 1); + _dogStatsdService.Dispose(); + + while (_listenThread.IsAlive) + { + } + + var messages = _udpListener.GetAndClearLastMessages(); + Assert.IsNotEmpty(messages); + Assert.That(messages[0], Does.Contain("sync.dispose.counter:1|c")); + } + + private void AssertWasReceived(string shouldBe, int index = 0) + { + while (_listenThread.IsAlive) + { + } + + Assert.AreEqual(shouldBe + "\n", _udpListener.GetAndClearLastMessages()[index]); + } + } +}