From a138a5f74243a86bf22ec9475aaa1a2dfeca9c69 Mon Sep 17 00:00:00 2001
From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com>
Date: Tue, 14 Apr 2026 10:42:18 -0400
Subject: [PATCH 1/3] Add synchronous mode for serverless environments
In AWS Lambda and similar serverless environments, background threads
get frozen when the handler returns, causing buffered metrics to be
lost. This adds a SynchronousMode option that bypasses the async queue
and sends metrics directly on the calling thread.
Changes:
- Extract IStatsBufferize interface from StatsBufferize
- Add SynchronousSender that routes metrics synchronously
- Add StatsdConfig.SynchronousMode property (default false)
- Suppress telemetry background timer in sync mode
- Add unit tests, builder tests, and integration tests
---
src/StatsdClient/Bufferize/IStatsBufferize.cs | 17 ++
.../Bufferize/IStatsBufferizeFactory.cs | 5 +-
src/StatsdClient/Bufferize/StatsBufferize.cs | 2 +-
.../Bufferize/StatsBufferizeFactory.cs | 7 +-
.../Bufferize/SynchronousSender.cs | 56 ++++
src/StatsdClient/MetricsSender.cs | 4 +-
src/StatsdClient/StatsdBuilder.cs | 20 +-
src/StatsdClient/StatsdConfig.cs | 10 +
src/StatsdClient/StatsdData.cs | 4 +-
src/StatsdClient/Telemetry.cs | 16 +-
.../Bufferize/SynchronousSenderTests.cs | 277 ++++++++++++++++++
.../StatsdClient.Tests/StatsdBuilderTests.cs | 60 +++-
.../SynchronousModeIntegrationTests.cs | 152 ++++++++++
13 files changed, 606 insertions(+), 24 deletions(-)
create mode 100644 src/StatsdClient/Bufferize/IStatsBufferize.cs
create mode 100644 src/StatsdClient/Bufferize/SynchronousSender.cs
create mode 100644 tests/StatsdClient.Tests/Bufferize/SynchronousSenderTests.cs
create mode 100644 tests/StatsdClient.Tests/SynchronousModeIntegrationTests.cs
diff --git a/src/StatsdClient/Bufferize/IStatsBufferize.cs b/src/StatsdClient/Bufferize/IStatsBufferize.cs
new file mode 100644
index 00000000..c6e9744d
--- /dev/null
+++ b/src/StatsdClient/Bufferize/IStatsBufferize.cs
@@ -0,0 +1,17 @@
+using System;
+using StatsdClient.Statistic;
+
+namespace StatsdClient.Bufferize
+{
+ ///
+ /// IStatsBufferize defines the contract for sending stats to the pipeline.
+ ///
+ internal interface IStatsBufferize : IDisposable
+ {
+ bool TryDequeueFromPool(out Stats stats);
+
+ void Send(Stats stats);
+
+ void Flush();
+ }
+}
diff --git a/src/StatsdClient/Bufferize/IStatsBufferizeFactory.cs b/src/StatsdClient/Bufferize/IStatsBufferizeFactory.cs
index 52667a3d..d3d1f9e5 100644
--- a/src/StatsdClient/Bufferize/IStatsBufferizeFactory.cs
+++ b/src/StatsdClient/Bufferize/IStatsBufferizeFactory.cs
@@ -12,7 +12,7 @@ namespace StatsdClient.Bufferize
///
internal interface IStatsBufferizeFactory
{
- StatsBufferize CreateStatsBufferize(
+ IStatsBufferize CreateStatsBufferize(
StatsRouter statsRouter,
int workerMaxItemCount,
TimeSpan? blockingQueueTimeout,
@@ -38,6 +38,7 @@ Telemetry CreateTelemetry(
TimeSpan flushInterval,
ITransport transport,
string[] globalTags,
- Action optionalExceptionHandler);
+ Action optionalExceptionHandler,
+ bool synchronousMode = false);
}
}
\ No newline at end of file
diff --git a/src/StatsdClient/Bufferize/StatsBufferize.cs b/src/StatsdClient/Bufferize/StatsBufferize.cs
index 278b89ad..db1e0c97 100644
--- a/src/StatsdClient/Bufferize/StatsBufferize.cs
+++ b/src/StatsdClient/Bufferize/StatsBufferize.cs
@@ -7,7 +7,7 @@ namespace StatsdClient.Bufferize
///
/// StatsBufferize bufferizes metrics before sending them.
///
- internal class StatsBufferize : IDisposable
+ internal class StatsBufferize : IStatsBufferize
{
private readonly AsynchronousWorker _worker;
diff --git a/src/StatsdClient/Bufferize/StatsBufferizeFactory.cs b/src/StatsdClient/Bufferize/StatsBufferizeFactory.cs
index 4ad91585..46d39e58 100644
--- a/src/StatsdClient/Bufferize/StatsBufferizeFactory.cs
+++ b/src/StatsdClient/Bufferize/StatsBufferizeFactory.cs
@@ -8,7 +8,7 @@ namespace StatsdClient.Bufferize
{
internal class StatsBufferizeFactory : IStatsBufferizeFactory
{
- public StatsBufferize CreateStatsBufferize(
+ public IStatsBufferize CreateStatsBufferize(
StatsRouter statsRouter,
int workerMaxItemCount,
TimeSpan? blockingQueueTimeout,
@@ -49,9 +49,10 @@ public Telemetry CreateTelemetry(
TimeSpan flushInterval,
ITransport transport,
string[] globalTags,
- Action optionalExceptionHandler)
+ Action optionalExceptionHandler,
+ bool synchronousMode = false)
{
- return new Telemetry(metricSerializer, assemblyVersion, flushInterval, transport, globalTags, optionalExceptionHandler);
+ return new Telemetry(metricSerializer, assemblyVersion, flushInterval, transport, globalTags, optionalExceptionHandler, synchronousMode);
}
public ITransport CreateNamedPipeTransport(string pipeName)
diff --git a/src/StatsdClient/Bufferize/SynchronousSender.cs b/src/StatsdClient/Bufferize/SynchronousSender.cs
new file mode 100644
index 00000000..ae183d12
--- /dev/null
+++ b/src/StatsdClient/Bufferize/SynchronousSender.cs
@@ -0,0 +1,56 @@
+using System;
+using StatsdClient.Statistic;
+
+namespace StatsdClient.Bufferize
+{
+ ///
+ /// SynchronousSender sends metrics synchronously on the calling thread.
+ /// This is intended for serverless environments (e.g., AWS Lambda) where
+ /// background threads may be frozen between invocations.
+ ///
+ internal class SynchronousSender : IStatsBufferize
+ {
+ [ThreadStatic]
+ private static Stats _threadLocalStats;
+
+ private readonly StatsRouter _statsRouter;
+ private readonly object _lock = new object();
+
+ public SynchronousSender(StatsRouter statsRouter)
+ {
+ _statsRouter = statsRouter ?? throw new ArgumentNullException(nameof(statsRouter));
+ }
+
+ public bool TryDequeueFromPool(out Stats stats)
+ {
+ if (_threadLocalStats == null)
+ {
+ _threadLocalStats = new Stats();
+ }
+
+ stats = _threadLocalStats;
+ return true;
+ }
+
+ public void Send(Stats stats)
+ {
+ lock (_lock)
+ {
+ _statsRouter.Route(stats);
+ }
+ }
+
+ public void Flush()
+ {
+ lock (_lock)
+ {
+ _statsRouter.Flush();
+ }
+ }
+
+ public void Dispose()
+ {
+ Flush();
+ }
+ }
+}
diff --git a/src/StatsdClient/MetricsSender.cs b/src/StatsdClient/MetricsSender.cs
index 68c8e195..f1214b39 100644
--- a/src/StatsdClient/MetricsSender.cs
+++ b/src/StatsdClient/MetricsSender.cs
@@ -7,14 +7,14 @@ namespace StatsdClient
internal class MetricsSender
{
private readonly Telemetry _optionalTelemetry;
- private readonly StatsBufferize _statsBufferize;
+ private readonly IStatsBufferize _statsBufferize;
private readonly bool _truncateIfTooLong;
private readonly IStopWatchFactory _stopwatchFactory;
private readonly IRandomGenerator _randomGenerator;
private readonly Cardinality? _defaultCardinality;
internal MetricsSender(
- StatsBufferize statsBufferize,
+ IStatsBufferize statsBufferize,
IRandomGenerator randomGenerator,
IStopWatchFactory stopwatchFactory,
Telemetry optionalTelemetry,
diff --git a/src/StatsdClient/StatsdBuilder.cs b/src/StatsdClient/StatsdBuilder.cs
index f7aa087b..54e23ac8 100644
--- a/src/StatsdClient/StatsdBuilder.cs
+++ b/src/StatsdClient/StatsdBuilder.cs
@@ -32,7 +32,7 @@ public StatsdData BuildStatsData(StatsdConfig config, Action optional
var originDetectionEnabled = IsOriginDetectionEnabled(config);
var serializers = CreateSerializers(config.Prefix, globalTags, config.Advanced.MaxMetricsInAsyncQueue, originDetectionEnabled, config.ContainerID);
- var telemetry = CreateTelemetry(serializers.MetricSerializer, config, globalTags, endPoint, transportData.Transport, optionalExceptionHandler);
+ var telemetry = CreateTelemetry(serializers.MetricSerializer, config, globalTags, endPoint, transportData.Transport, optionalExceptionHandler, config.SynchronousMode);
var statsBufferize = CreateStatsBufferize(
telemetry,
transportData.Transport,
@@ -40,6 +40,7 @@ public StatsdData BuildStatsData(StatsdConfig config, Action optional
config.Advanced,
serializers,
config.ClientSideAggregation,
+ config.SynchronousMode,
optionalExceptionHandler);
var metricsSender = new MetricsSender(
@@ -179,7 +180,8 @@ private Telemetry CreateTelemetry(
string[] globalTags,
DogStatsdEndPoint dogStatsdEndPoint,
ITransport transport,
- Action optionalExceptionHandler)
+ Action optionalExceptionHandler,
+ bool synchronousMode = false)
{
var telemetryFlush = config.Advanced.TelemetryFlushInterval;
@@ -194,7 +196,7 @@ private Telemetry CreateTelemetry(
telemetryTransport = CreateTransport(optionalTelemetryEndPoint, config);
}
- return _factory.CreateTelemetry(metricSerializer, version, telemetryFlush.Value, telemetryTransport, globalTags, optionalExceptionHandler);
+ return _factory.CreateTelemetry(metricSerializer, version, telemetryFlush.Value, telemetryTransport, globalTags, optionalExceptionHandler, synchronousMode);
}
// Telemetry is not enabled
@@ -244,13 +246,14 @@ private TransportData CreateTransportData(DogStatsdEndPoint endPoint, StatsdConf
return transportData;
}
- private StatsBufferize CreateStatsBufferize(
+ private IStatsBufferize CreateStatsBufferize(
Telemetry telemetry,
ITransport transport,
int bufferCapacity,
AdvancedStatsConfig config,
Serializers serializers,
ClientSideAggregationConfig optionalClientSideAggregationConfig,
+ bool synchronousMode,
Action optionalExceptionHandler)
{
var bufferHandler = new BufferBuilderHandler(telemetry, transport);
@@ -276,14 +279,17 @@ private StatsBufferize CreateStatsBufferize(
var statsRouter = _factory.CreateStatsRouter(serializers, bufferBuilder, optionalAggregators);
- var statsBufferize = _factory.CreateStatsBufferize(
+ if (synchronousMode)
+ {
+ return new SynchronousSender(statsRouter);
+ }
+
+ return _factory.CreateStatsBufferize(
statsRouter,
config.MaxMetricsInAsyncQueue,
config.MaxBlockDuration,
config.DurationBeforeSendingNotFullBuffer,
optionalExceptionHandler);
-
- return statsBufferize;
}
private ITransport CreateUDPTransport(DogStatsdEndPoint endPoint)
diff --git a/src/StatsdClient/StatsdConfig.cs b/src/StatsdClient/StatsdConfig.cs
index 3a8c76a3..3e861fe4 100644
--- a/src/StatsdClient/StatsdConfig.cs
+++ b/src/StatsdClient/StatsdConfig.cs
@@ -160,5 +160,15 @@ public StatsdConfig()
/// This value is used when no cardinality is explicitly specified.
///
public Cardinality? Cardinality { get; set; }
+
+ ///
+ /// Gets or sets a value indicating whether the client operates in synchronous mode.
+ /// When enabled, metrics are sent directly on the calling thread without background
+ /// processing. This is recommended for serverless environments (e.g., AWS Lambda)
+ /// where background threads may be frozen between invocations.
+ /// Call Flush() at the end of each invocation to ensure all buffered metrics are sent.
+ /// Default is false.
+ ///
+ public bool SynchronousMode { get; set; } = false;
}
}
diff --git a/src/StatsdClient/StatsdData.cs b/src/StatsdClient/StatsdData.cs
index f8ba7610..28936ed1 100644
--- a/src/StatsdClient/StatsdData.cs
+++ b/src/StatsdClient/StatsdData.cs
@@ -7,11 +7,11 @@ namespace StatsdClient
internal class StatsdData : IDisposable
{
private ITransport _transport;
- private StatsBufferize _statsBufferize;
+ private IStatsBufferize _statsBufferize;
public StatsdData(
MetricsSender metricsSender,
- StatsBufferize statsBufferize,
+ IStatsBufferize statsBufferize,
ITransport transport,
Telemetry telemetry)
{
diff --git a/src/StatsdClient/Telemetry.cs b/src/StatsdClient/Telemetry.cs
index 15e55040..3366b1db 100644
--- a/src/StatsdClient/Telemetry.cs
+++ b/src/StatsdClient/Telemetry.cs
@@ -40,7 +40,8 @@ public Telemetry(
TimeSpan flushInterval,
ITransport transport,
string[] globalTags,
- Action optionalExceptionHandler)
+ Action optionalExceptionHandler,
+ bool synchronousMode = false)
{
_optionalMetricSerializer = metricSerializer;
_optionalTransport = transport;
@@ -53,11 +54,14 @@ public Telemetry(
_aggregatedContexts.Add(MetricType.Count, new ValueWithTags(_optionalTags, "metrics_type:count"));
_aggregatedContexts.Add(MetricType.Set, new ValueWithTags(_optionalTags, "metrics_type:set"));
_optionalExceptionHandler = optionalExceptionHandler;
- _optionalTimer = new Timer(
- _ => Flush(),
- null,
- flushInterval,
- flushInterval);
+ if (!synchronousMode)
+ {
+ _optionalTimer = new Timer(
+ _ => Flush(),
+ null,
+ flushInterval,
+ flushInterval);
+ }
}
public static string MetricsMetricName => _telemetryPrefix + "metrics";
diff --git a/tests/StatsdClient.Tests/Bufferize/SynchronousSenderTests.cs b/tests/StatsdClient.Tests/Bufferize/SynchronousSenderTests.cs
new file mode 100644
index 00000000..22a34cf5
--- /dev/null
+++ b/tests/StatsdClient.Tests/Bufferize/SynchronousSenderTests.cs
@@ -0,0 +1,277 @@
+using System;
+using System.Collections.Concurrent;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using NUnit.Framework;
+using StatsdClient;
+using StatsdClient.Bufferize;
+using StatsdClient.Statistic;
+using Tests.Utils;
+
+namespace Tests
+{
+ [TestFixture]
+ public class SynchronousSenderTests
+ {
+ [Test]
+ public void TryDequeueFromPoolAlwaysSucceeds()
+ {
+ var handler = new BufferBuilderHandlerMock();
+ var bufferBuilder = new BufferBuilder(handler, 1024, "\n", Tools.ExceptionHandler);
+ var serializers = new Serializers
+ {
+ MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty),
+ };
+ var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
+
+ using (var sender = new SynchronousSender(statsRouter))
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ Assert.IsTrue(sender.TryDequeueFromPool(out var stats));
+ Assert.IsNotNull(stats);
+ }
+ }
+ }
+
+ [Test]
+ public void SendAndFlushDeliversMetrics()
+ {
+ var handler = new BufferBuilderHandlerMock();
+ var bufferBuilder = new BufferBuilder(handler, 1024, "\n", Tools.ExceptionHandler);
+ var serializers = new Serializers
+ {
+ MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty),
+ };
+ var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
+
+ using (var sender = new SynchronousSender(statsRouter))
+ {
+ sender.TryDequeueFromPool(out var stats);
+ stats.Kind = StatsKind.Metric;
+ stats.Metric.MetricType = MetricType.Count;
+ stats.Metric.StatName = "test.counter";
+ stats.Metric.NumericValue = 42;
+ stats.Metric.SampleRate = 1.0;
+ stats.Metric.Tags = null;
+
+ sender.Send(stats);
+
+ // Buffer hasn't been flushed yet (not full)
+ Assert.IsNull(handler.Buffer);
+
+ sender.Flush();
+
+ // After flush, data should be delivered
+ Assert.IsNotNull(handler.Buffer);
+ Assert.AreEqual("test.counter:42|c\n", handler.BufferToString());
+ }
+ }
+
+ [Test]
+ public void MultipleMetricsBatchInBuffer()
+ {
+ var handler = new BufferBuilderHandlerMock();
+ var bufferBuilder = new BufferBuilder(handler, 1024, "\n", Tools.ExceptionHandler);
+ var serializers = new Serializers
+ {
+ MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty),
+ };
+ var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
+
+ using (var sender = new SynchronousSender(statsRouter))
+ {
+ sender.TryDequeueFromPool(out var stats);
+
+ stats.Kind = StatsKind.Metric;
+ stats.Metric.MetricType = MetricType.Count;
+ stats.Metric.StatName = "counter1";
+ stats.Metric.NumericValue = 1;
+ stats.Metric.SampleRate = 1.0;
+ stats.Metric.Tags = null;
+ sender.Send(stats);
+
+ stats.Metric.StatName = "counter2";
+ stats.Metric.NumericValue = 2;
+ sender.Send(stats);
+
+ // Nothing sent yet
+ Assert.IsNull(handler.Buffer);
+
+ sender.Flush();
+
+ // Both metrics in a single packet
+ Assert.IsNotNull(handler.Buffer);
+ var result = handler.BufferToString();
+ Assert.That(result, Does.Contain("counter1:1|c"));
+ Assert.That(result, Does.Contain("counter2:2|c"));
+ }
+ }
+
+ [Test]
+ public void BufferAutoFlushesWhenFull()
+ {
+ var handler = new BufferBuilderHandlerMock();
+
+ // Small buffer capacity: first metric fits, second triggers auto-flush of the first
+ var bufferBuilder = new BufferBuilder(handler, 30, "\n", Tools.ExceptionHandler);
+ var serializers = new Serializers
+ {
+ MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty),
+ };
+ var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
+
+ using (var sender = new SynchronousSender(statsRouter))
+ {
+ sender.TryDequeueFromPool(out var stats);
+ stats.Kind = StatsKind.Metric;
+ stats.Metric.MetricType = MetricType.Count;
+ stats.Metric.StatName = "first.metric";
+ stats.Metric.NumericValue = 1;
+ stats.Metric.SampleRate = 1.0;
+ stats.Metric.Tags = null;
+ sender.Send(stats);
+
+ // First metric fits in buffer, nothing flushed yet
+ Assert.IsNull(handler.Buffer);
+
+ // Second metric won't fit alongside the first, triggering auto-flush
+ stats.Metric.StatName = "second.metric";
+ stats.Metric.NumericValue = 2;
+ sender.Send(stats);
+
+ // First metric should have been auto-flushed
+ Assert.IsNotNull(handler.Buffer);
+ Assert.AreEqual("first.metric:1|c\n", handler.BufferToString());
+ }
+ }
+
+ [Test]
+ public void DisposeTriggersFlush()
+ {
+ var handler = new BufferBuilderHandlerMock();
+ var bufferBuilder = new BufferBuilder(handler, 1024, "\n", Tools.ExceptionHandler);
+ var serializers = new Serializers
+ {
+ MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty),
+ };
+ var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
+
+ var sender = new SynchronousSender(statsRouter);
+
+ sender.TryDequeueFromPool(out var stats);
+ stats.Kind = StatsKind.Metric;
+ stats.Metric.MetricType = MetricType.Gauge;
+ stats.Metric.StatName = "test.gauge";
+ stats.Metric.NumericValue = 100;
+ stats.Metric.SampleRate = 1.0;
+ stats.Metric.Tags = null;
+ sender.Send(stats);
+
+ Assert.IsNull(handler.Buffer);
+
+ sender.Dispose();
+
+ Assert.IsNotNull(handler.Buffer);
+ Assert.AreEqual("test.gauge:100|g\n", handler.BufferToString());
+ }
+
+ [Test]
+ public void ThreadSafety()
+ {
+ var handler = new ConcurrentBufferBuilderHandler();
+ var bufferBuilder = new BufferBuilder(handler, 1024, "\n", Tools.ExceptionHandler);
+ var serializers = new Serializers
+ {
+ MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty),
+ };
+ var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
+
+ using (var sender = new SynchronousSender(statsRouter))
+ {
+ const int threadCount = 10;
+ const int metricsPerThread = 100;
+ var barrier = new Barrier(threadCount);
+ var tasks = new Task[threadCount];
+
+ for (int t = 0; t < threadCount; t++)
+ {
+ int threadIndex = t;
+ tasks[t] = Task.Run(() =>
+ {
+ barrier.SignalAndWait();
+ for (int i = 0; i < metricsPerThread; i++)
+ {
+ sender.TryDequeueFromPool(out var stats);
+ stats.Kind = StatsKind.Metric;
+ stats.Metric.MetricType = MetricType.Count;
+ stats.Metric.StatName = $"thread{threadIndex}.counter";
+ stats.Metric.NumericValue = 1;
+ stats.Metric.SampleRate = 1.0;
+ stats.Metric.Tags = null;
+ sender.Send(stats);
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+ sender.Flush();
+
+ // Verify no exceptions were thrown and all data was captured
+ var allData = handler.GetAllData();
+ Assert.IsNotEmpty(allData);
+
+ // Verify each thread's metrics appear in the output
+ for (int t = 0; t < threadCount; t++)
+ {
+ Assert.That(allData, Does.Contain($"thread{t}.counter:1|c"));
+ }
+ }
+ }
+
+ [Test]
+ public void FlushWhenEmptyDoesNotThrow()
+ {
+ var handler = new BufferBuilderHandlerMock();
+ var bufferBuilder = new BufferBuilder(handler, 1024, "\n", Tools.ExceptionHandler);
+ var serializers = new Serializers
+ {
+ MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty),
+ };
+ var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
+
+ using (var sender = new SynchronousSender(statsRouter))
+ {
+ Assert.DoesNotThrow(() => sender.Flush());
+ Assert.DoesNotThrow(() => sender.Flush());
+ }
+ }
+
+ ///
+ /// Thread-safe handler that accumulates all buffers for verification.
+ ///
+ private class ConcurrentBufferBuilderHandler : IBufferBuilderHandler
+ {
+ private readonly ConcurrentBag _buffers = new ConcurrentBag();
+
+ public void Handle(byte[] buffer, int length)
+ {
+ var data = new byte[length];
+ Array.Copy(buffer, data, length);
+ _buffers.Add(Encoding.UTF8.GetString(data));
+ }
+
+ public string GetAllData()
+ {
+ var sb = new StringBuilder();
+ foreach (var buf in _buffers)
+ {
+ sb.Append(buf);
+ }
+
+ return sb.ToString();
+ }
+ }
+ }
+}
diff --git a/tests/StatsdClient.Tests/StatsdBuilderTests.cs b/tests/StatsdClient.Tests/StatsdBuilderTests.cs
index a5a00af0..64d6e922 100644
--- a/tests/StatsdClient.Tests/StatsdBuilderTests.cs
+++ b/tests/StatsdClient.Tests/StatsdBuilderTests.cs
@@ -56,6 +56,13 @@ public void Init()
});
_ipEndPoint = null;
+ _mock.Setup(m => m.CreateStatsRouter(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .Returns(
+ (s, b, a) => new StatsRouter(s, b, a));
+
foreach (var key in _envVarsKeyToRestore)
{
_envVarsToRestore[key] = Environment.GetEnvironmentVariable(key);
@@ -202,7 +209,8 @@ public void CreateTelemetry()
conf.TelemetryFlushInterval.Value,
It.IsAny(),
It.Is(tags => Enumerable.SequenceEqual(tags, expectedTags)),
- Tools.ExceptionHandler));
+ Tools.ExceptionHandler,
+ false));
}
[Test]
@@ -237,6 +245,56 @@ public void ClientSideAggregation()
It.IsNotNull()));
}
+ [Test]
+ public void SynchronousModeSkipsAsyncBufferize()
+ {
+ var config = new StatsdConfig { };
+ config.SynchronousMode = true;
+ config.Advanced.TelemetryFlushInterval = null;
+
+ BuildStatsData(config);
+
+ // StatsRouter should still be created
+ _mock.Verify(
+ m => m.CreateStatsRouter(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()),
+ Times.Once);
+
+ // CreateStatsBufferize should NOT be called in synchronous mode
+ _mock.Verify(
+ m => m.CreateStatsBufferize(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny>()),
+ Times.Never);
+ }
+
+ [Test]
+ public void SynchronousModeTelemetry()
+ {
+ var config = new StatsdConfig { };
+ config.SynchronousMode = true;
+ config.Advanced.TelemetryFlushInterval = TimeSpan.FromMinutes(1);
+
+ BuildStatsData(config);
+
+ // Telemetry should be created with synchronousMode = true
+ _mock.Verify(
+ m => m.CreateTelemetry(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny>(),
+ true),
+ Times.Once);
+ }
+
[Test]
public void PipeName()
{
diff --git a/tests/StatsdClient.Tests/SynchronousModeIntegrationTests.cs b/tests/StatsdClient.Tests/SynchronousModeIntegrationTests.cs
new file mode 100644
index 00000000..eb9307f3
--- /dev/null
+++ b/tests/StatsdClient.Tests/SynchronousModeIntegrationTests.cs
@@ -0,0 +1,152 @@
+using System;
+using System.Threading;
+using NUnit.Framework;
+using StatsdClient;
+using Tests.Helpers;
+
+namespace Tests
+{
+ [TestFixture]
+ public class SynchronousModeIntegrationTests
+ {
+ private readonly int _serverPort = Convert.ToInt32("8127");
+ private UdpListener _udpListener;
+ private Thread _listenThread;
+ private string _serverName = "127.0.0.1";
+ private DogStatsdService _dogStatsdService;
+
+ [OneTimeSetUp]
+ public void SetupListener()
+ {
+ _udpListener = new UdpListener(_serverName, _serverPort);
+ }
+
+ [OneTimeTearDown]
+ public void TearDownUdpListener()
+ {
+ _udpListener.Dispose();
+ }
+
+ [SetUp]
+ public void Setup()
+ {
+ _listenThread = new Thread(new ParameterizedThreadStart(_udpListener.Listen));
+ _listenThread.Start();
+
+ var config = new StatsdConfig
+ {
+ StatsdServerName = _serverName,
+ StatsdPort = _serverPort,
+ SynchronousMode = true,
+ OriginDetection = false,
+ };
+ config.ClientSideAggregation = null;
+ config.Advanced.TelemetryFlushInterval = null;
+ _dogStatsdService = new DogStatsdService();
+ _dogStatsdService.Configure(config);
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ _udpListener.GetAndClearLastMessages();
+ _dogStatsdService.Dispose();
+ }
+
+ [Test]
+ public void Counter()
+ {
+ _dogStatsdService.Counter("sync.counter", 42);
+ _dogStatsdService.Flush();
+ AssertWasReceived("sync.counter:42|c");
+ }
+
+ [Test]
+ public void Gauge()
+ {
+ _dogStatsdService.Gauge("sync.gauge", 100.5);
+ _dogStatsdService.Flush();
+ AssertWasReceived("sync.gauge:100.5|g");
+ }
+
+ [Test]
+ public void Histogram()
+ {
+ _dogStatsdService.Histogram("sync.histogram", 250);
+ _dogStatsdService.Flush();
+ AssertWasReceived("sync.histogram:250|h");
+ }
+
+ [Test]
+ public void Distribution()
+ {
+ _dogStatsdService.Distribution("sync.distribution", 30);
+ _dogStatsdService.Flush();
+ AssertWasReceived("sync.distribution:30|d");
+ }
+
+ [Test]
+ public void Timer()
+ {
+ _dogStatsdService.Timer("sync.timer", 999);
+ _dogStatsdService.Flush();
+ AssertWasReceived("sync.timer:999|ms");
+ }
+
+ [Test]
+ public void Set()
+ {
+ _dogStatsdService.Set("sync.set", "user123");
+ _dogStatsdService.Flush();
+ AssertWasReceived("sync.set:user123|s");
+ }
+
+ [Test]
+ public void CounterWithTags()
+ {
+ _dogStatsdService.Counter("sync.counter", 1, tags: new[] { "env:prod", "region:us" });
+ _dogStatsdService.Flush();
+ AssertWasReceived("sync.counter:1|c|#env:prod,region:us");
+ }
+
+ [Test]
+ public void Event()
+ {
+ _dogStatsdService.Event("Title", "Text");
+ _dogStatsdService.Flush();
+ AssertWasReceived("_e{5,4}:Title|Text");
+ }
+
+ [Test]
+ public void ServiceCheck()
+ {
+ _dogStatsdService.ServiceCheck("my.check", Status.OK);
+ _dogStatsdService.Flush();
+ AssertWasReceived("_sc|my.check|0");
+ }
+
+ [Test]
+ public void DisposeFlushesRemainingMetrics()
+ {
+ _dogStatsdService.Counter("sync.dispose.counter", 1);
+ _dogStatsdService.Dispose();
+
+ while (_listenThread.IsAlive)
+ {
+ }
+
+ var messages = _udpListener.GetAndClearLastMessages();
+ Assert.IsNotEmpty(messages);
+ Assert.That(messages[0], Does.Contain("sync.dispose.counter:1|c"));
+ }
+
+ private void AssertWasReceived(string shouldBe, int index = 0)
+ {
+ while (_listenThread.IsAlive)
+ {
+ }
+
+ Assert.AreEqual(shouldBe + "\n", _udpListener.GetAndClearLastMessages()[index]);
+ }
+ }
+}
From 37927bf39cf68db547acba01cc77fc4882581f2b Mon Sep 17 00:00:00 2001
From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com>
Date: Tue, 14 Apr 2026 11:03:14 -0400
Subject: [PATCH 2/3] Forward exceptions to error handler in synchronous mode
In the async path, AsynchronousWorker.Dequeue() catches exceptions from
Route/Flush and forwards them to optionalExceptionHandler. The sync
path was letting exceptions propagate directly into user code, which
could crash Lambda handlers on transport failures or serialization
errors. Wrap Send and Flush in try-catch to match async behavior.
---
.../Bufferize/SynchronousSender.cs | 26 ++++++++---
src/StatsdClient/StatsdBuilder.cs | 2 +-
.../Bufferize/SynchronousSenderTests.cs | 45 +++++++++++++++++++
3 files changed, 67 insertions(+), 6 deletions(-)
diff --git a/src/StatsdClient/Bufferize/SynchronousSender.cs b/src/StatsdClient/Bufferize/SynchronousSender.cs
index ae183d12..2d7e2dc0 100644
--- a/src/StatsdClient/Bufferize/SynchronousSender.cs
+++ b/src/StatsdClient/Bufferize/SynchronousSender.cs
@@ -14,11 +14,13 @@ internal class SynchronousSender : IStatsBufferize
private static Stats _threadLocalStats;
private readonly StatsRouter _statsRouter;
+ private readonly Action _optionalExceptionHandler;
private readonly object _lock = new object();
- public SynchronousSender(StatsRouter statsRouter)
+ public SynchronousSender(StatsRouter statsRouter, Action optionalExceptionHandler = null)
{
_statsRouter = statsRouter ?? throw new ArgumentNullException(nameof(statsRouter));
+ _optionalExceptionHandler = optionalExceptionHandler;
}
public bool TryDequeueFromPool(out Stats stats)
@@ -34,17 +36,31 @@ public bool TryDequeueFromPool(out Stats stats)
public void Send(Stats stats)
{
- lock (_lock)
+ try
{
- _statsRouter.Route(stats);
+ lock (_lock)
+ {
+ _statsRouter.Route(stats);
+ }
+ }
+ catch (Exception e)
+ {
+ _optionalExceptionHandler?.Invoke(e);
}
}
public void Flush()
{
- lock (_lock)
+ try
+ {
+ lock (_lock)
+ {
+ _statsRouter.Flush();
+ }
+ }
+ catch (Exception e)
{
- _statsRouter.Flush();
+ _optionalExceptionHandler?.Invoke(e);
}
}
diff --git a/src/StatsdClient/StatsdBuilder.cs b/src/StatsdClient/StatsdBuilder.cs
index 54e23ac8..55eb6dc4 100644
--- a/src/StatsdClient/StatsdBuilder.cs
+++ b/src/StatsdClient/StatsdBuilder.cs
@@ -281,7 +281,7 @@ private IStatsBufferize CreateStatsBufferize(
if (synchronousMode)
{
- return new SynchronousSender(statsRouter);
+ return new SynchronousSender(statsRouter, optionalExceptionHandler);
}
return _factory.CreateStatsBufferize(
diff --git a/tests/StatsdClient.Tests/Bufferize/SynchronousSenderTests.cs b/tests/StatsdClient.Tests/Bufferize/SynchronousSenderTests.cs
index 22a34cf5..83214457 100644
--- a/tests/StatsdClient.Tests/Bufferize/SynchronousSenderTests.cs
+++ b/tests/StatsdClient.Tests/Bufferize/SynchronousSenderTests.cs
@@ -248,6 +248,40 @@ public void FlushWhenEmptyDoesNotThrow()
}
}
+ [Test]
+ public void ExceptionsForwardedToHandler()
+ {
+ var throwingHandler = new ThrowingBufferBuilderHandler();
+ var bufferBuilder = new BufferBuilder(throwingHandler, 20, "\n", null);
+ var serializers = new Serializers
+ {
+ MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty),
+ };
+ var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
+
+ Exception caughtException = null;
+ Action exceptionHandler = e => caughtException = e;
+
+ using (var sender = new SynchronousSender(statsRouter, exceptionHandler))
+ {
+ sender.TryDequeueFromPool(out var stats);
+ stats.Kind = StatsKind.Metric;
+ stats.Metric.MetricType = MetricType.Count;
+ stats.Metric.StatName = "will.fail";
+ stats.Metric.NumericValue = 1;
+ stats.Metric.SampleRate = 1.0;
+ stats.Metric.Tags = null;
+
+ // Send enough data to trigger a buffer flush (buffer is 20 bytes, metric is ~15)
+ sender.Send(stats);
+ sender.Send(stats);
+
+ // Exception should have been caught by the handler, not propagated
+ Assert.IsNotNull(caughtException);
+ Assert.That(caughtException.Message, Does.Contain("Transport error"));
+ }
+ }
+
///
/// Thread-safe handler that accumulates all buffers for verification.
///
@@ -273,5 +307,16 @@ public string GetAllData()
return sb.ToString();
}
}
+
+ ///
+ /// Handler that throws on Handle to simulate transport failures.
+ ///
+ private class ThrowingBufferBuilderHandler : IBufferBuilderHandler
+ {
+ public void Handle(byte[] buffer, int length)
+ {
+ throw new InvalidOperationException("Transport error");
+ }
+ }
}
}
From 1698f2942b8d78644422e03ea62df11fde254da0 Mon Sep 17 00:00:00 2001
From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com>
Date: Wed, 22 Apr 2026 11:28:32 -0400
Subject: [PATCH 3/3] Address PR review feedback: rename types and rethrow
exceptions
Rename IStatsBufferize -> IStatsSender, StatsBufferize -> AsynchronousBufferizedSender,
and factory types for consistency per reviewer suggestion. Update SynchronousSender to
rethrow exceptions when no handler is configured instead of silently swallowing them.
---
...ize.cs => AsynchronousBufferizedSender.cs} | 6 ++---
.../{IStatsBufferize.cs => IStatsSender.cs} | 4 +--
...erizeFactory.cs => IStatsSenderFactory.cs} | 8 +++---
...ferizeFactory.cs => StatsSenderFactory.cs} | 6 ++---
.../Bufferize/SynchronousSender.cs | 20 +++++++++++---
src/StatsdClient/DogStatsdService.cs | 2 +-
src/StatsdClient/MetricsSender.cs | 10 +++----
src/StatsdClient/StatsdBuilder.cs | 14 +++++-----
src/StatsdClient/StatsdData.cs | 14 +++++-----
...s => AsynchronousBufferizedSenderTests.cs} | 8 +++---
.../Bufferize/SynchronousSenderTests.cs | 27 +++++++++++++++++++
.../StatsdClient.Tests/StatsdBuilderTests.cs | 16 +++++------
12 files changed, 88 insertions(+), 47 deletions(-)
rename src/StatsdClient/Bufferize/{StatsBufferize.cs => AsynchronousBufferizedSender.cs} (93%)
rename src/StatsdClient/Bufferize/{IStatsBufferize.cs => IStatsSender.cs} (64%)
rename src/StatsdClient/Bufferize/{IStatsBufferizeFactory.cs => IStatsSenderFactory.cs} (84%)
rename src/StatsdClient/Bufferize/{StatsBufferizeFactory.cs => StatsSenderFactory.cs} (91%)
rename tests/StatsdClient.Tests/Bufferize/{StatsBufferizeTests.cs => AsynchronousBufferizedSenderTests.cs} (82%)
diff --git a/src/StatsdClient/Bufferize/StatsBufferize.cs b/src/StatsdClient/Bufferize/AsynchronousBufferizedSender.cs
similarity index 93%
rename from src/StatsdClient/Bufferize/StatsBufferize.cs
rename to src/StatsdClient/Bufferize/AsynchronousBufferizedSender.cs
index db1e0c97..663d9a28 100644
--- a/src/StatsdClient/Bufferize/StatsBufferize.cs
+++ b/src/StatsdClient/Bufferize/AsynchronousBufferizedSender.cs
@@ -5,13 +5,13 @@
namespace StatsdClient.Bufferize
{
///
- /// StatsBufferize bufferizes metrics before sending them.
+ /// AsynchronousBufferizedSender bufferizes metrics before sending them.
///
- internal class StatsBufferize : IStatsBufferize
+ internal class AsynchronousBufferizedSender : IStatsSender
{
private readonly AsynchronousWorker _worker;
- public StatsBufferize(
+ public AsynchronousBufferizedSender(
StatsRouter statsRouter,
int workerMaxItemCount,
TimeSpan? blockingQueueTimeout,
diff --git a/src/StatsdClient/Bufferize/IStatsBufferize.cs b/src/StatsdClient/Bufferize/IStatsSender.cs
similarity index 64%
rename from src/StatsdClient/Bufferize/IStatsBufferize.cs
rename to src/StatsdClient/Bufferize/IStatsSender.cs
index c6e9744d..8c544ad7 100644
--- a/src/StatsdClient/Bufferize/IStatsBufferize.cs
+++ b/src/StatsdClient/Bufferize/IStatsSender.cs
@@ -4,9 +4,9 @@
namespace StatsdClient.Bufferize
{
///
- /// IStatsBufferize defines the contract for sending stats to the pipeline.
+ /// IStatsSender defines the contract for sending stats to the pipeline.
///
- internal interface IStatsBufferize : IDisposable
+ internal interface IStatsSender : IDisposable
{
bool TryDequeueFromPool(out Stats stats);
diff --git a/src/StatsdClient/Bufferize/IStatsBufferizeFactory.cs b/src/StatsdClient/Bufferize/IStatsSenderFactory.cs
similarity index 84%
rename from src/StatsdClient/Bufferize/IStatsBufferizeFactory.cs
rename to src/StatsdClient/Bufferize/IStatsSenderFactory.cs
index d3d1f9e5..cb4a2a0a 100644
--- a/src/StatsdClient/Bufferize/IStatsBufferizeFactory.cs
+++ b/src/StatsdClient/Bufferize/IStatsSenderFactory.cs
@@ -7,12 +7,12 @@
namespace StatsdClient.Bufferize
{
///
- /// IStatsBufferizeFactory is a factory for StatsBufferize.
- /// It is used to test StatsBufferize.
+ /// IStatsSenderFactory is a factory for creating IStatsSender instances.
+ /// It is used to test StatsdBuilder.
///
- internal interface IStatsBufferizeFactory
+ internal interface IStatsSenderFactory
{
- IStatsBufferize CreateStatsBufferize(
+ IStatsSender CreateAsynchronousBufferizedSender(
StatsRouter statsRouter,
int workerMaxItemCount,
TimeSpan? blockingQueueTimeout,
diff --git a/src/StatsdClient/Bufferize/StatsBufferizeFactory.cs b/src/StatsdClient/Bufferize/StatsSenderFactory.cs
similarity index 91%
rename from src/StatsdClient/Bufferize/StatsBufferizeFactory.cs
rename to src/StatsdClient/Bufferize/StatsSenderFactory.cs
index 46d39e58..3386f712 100644
--- a/src/StatsdClient/Bufferize/StatsBufferizeFactory.cs
+++ b/src/StatsdClient/Bufferize/StatsSenderFactory.cs
@@ -6,16 +6,16 @@
namespace StatsdClient.Bufferize
{
- internal class StatsBufferizeFactory : IStatsBufferizeFactory
+ internal class StatsSenderFactory : IStatsSenderFactory
{
- public IStatsBufferize CreateStatsBufferize(
+ public IStatsSender CreateAsynchronousBufferizedSender(
StatsRouter statsRouter,
int workerMaxItemCount,
TimeSpan? blockingQueueTimeout,
TimeSpan maxIdleWaitBeforeSending,
Action optionalExceptionHandler)
{
- return new StatsBufferize(
+ return new AsynchronousBufferizedSender(
statsRouter,
workerMaxItemCount,
blockingQueueTimeout,
diff --git a/src/StatsdClient/Bufferize/SynchronousSender.cs b/src/StatsdClient/Bufferize/SynchronousSender.cs
index 2d7e2dc0..fcb0b6f2 100644
--- a/src/StatsdClient/Bufferize/SynchronousSender.cs
+++ b/src/StatsdClient/Bufferize/SynchronousSender.cs
@@ -8,7 +8,7 @@ namespace StatsdClient.Bufferize
/// This is intended for serverless environments (e.g., AWS Lambda) where
/// background threads may be frozen between invocations.
///
- internal class SynchronousSender : IStatsBufferize
+ internal class SynchronousSender : IStatsSender
{
[ThreadStatic]
private static Stats _threadLocalStats;
@@ -45,7 +45,14 @@ public void Send(Stats stats)
}
catch (Exception e)
{
- _optionalExceptionHandler?.Invoke(e);
+ if (_optionalExceptionHandler != null)
+ {
+ _optionalExceptionHandler.Invoke(e);
+ }
+ else
+ {
+ throw;
+ }
}
}
@@ -60,7 +67,14 @@ public void Flush()
}
catch (Exception e)
{
- _optionalExceptionHandler?.Invoke(e);
+ if (_optionalExceptionHandler != null)
+ {
+ _optionalExceptionHandler.Invoke(e);
+ }
+ else
+ {
+ throw;
+ }
}
}
diff --git a/src/StatsdClient/DogStatsdService.cs b/src/StatsdClient/DogStatsdService.cs
index 79c11edb..712e2ffe 100644
--- a/src/StatsdClient/DogStatsdService.cs
+++ b/src/StatsdClient/DogStatsdService.cs
@@ -11,7 +11,7 @@ namespace StatsdClient
///
public class DogStatsdService : IDogStatsd, IDisposable
{
- private StatsdBuilder _statsdBuilder = new StatsdBuilder(new StatsBufferizeFactory());
+ private StatsdBuilder _statsdBuilder = new StatsdBuilder(new StatsSenderFactory());
private MetricsSender _metricsSender;
private StatsdData _statsdData;
private StatsdConfig _config;
diff --git a/src/StatsdClient/MetricsSender.cs b/src/StatsdClient/MetricsSender.cs
index f1214b39..f7afb775 100644
--- a/src/StatsdClient/MetricsSender.cs
+++ b/src/StatsdClient/MetricsSender.cs
@@ -7,14 +7,14 @@ namespace StatsdClient
internal class MetricsSender
{
private readonly Telemetry _optionalTelemetry;
- private readonly IStatsBufferize _statsBufferize;
+ private readonly IStatsSender _statsSender;
private readonly bool _truncateIfTooLong;
private readonly IStopWatchFactory _stopwatchFactory;
private readonly IRandomGenerator _randomGenerator;
private readonly Cardinality? _defaultCardinality;
internal MetricsSender(
- IStatsBufferize statsBufferize,
+ IStatsSender statsSender,
IRandomGenerator randomGenerator,
IStopWatchFactory stopwatchFactory,
Telemetry optionalTelemetry,
@@ -22,7 +22,7 @@ internal MetricsSender(
Cardinality? defaultCardinality = null)
{
_stopwatchFactory = stopwatchFactory;
- _statsBufferize = statsBufferize;
+ _statsSender = statsSender;
_randomGenerator = randomGenerator;
_optionalTelemetry = optionalTelemetry;
_truncateIfTooLong = truncateIfTooLong;
@@ -142,7 +142,7 @@ public void Send(Action actionToTime, string statName, double sampleRate = 1.0,
private bool TryDequeueStats(out Stats stats)
{
- if (_statsBufferize.TryDequeueFromPool(out stats))
+ if (_statsSender.TryDequeueFromPool(out stats))
{
return true;
}
@@ -151,6 +151,6 @@ private bool TryDequeueStats(out Stats stats)
return false;
}
- private void Send(Stats metricFields) => _statsBufferize.Send(metricFields);
+ private void Send(Stats metricFields) => _statsSender.Send(metricFields);
}
}
diff --git a/src/StatsdClient/StatsdBuilder.cs b/src/StatsdClient/StatsdBuilder.cs
index 55eb6dc4..a4d89327 100644
--- a/src/StatsdClient/StatsdBuilder.cs
+++ b/src/StatsdClient/StatsdBuilder.cs
@@ -16,9 +16,9 @@ internal class StatsdBuilder
public static readonly string UnixDomainSocketPrefix = "unix://";
private const string _entityIdInternalTagKey = "dd.internal.entity_id";
- private readonly IStatsBufferizeFactory _factory;
+ private readonly IStatsSenderFactory _factory;
- public StatsdBuilder(IStatsBufferizeFactory factory)
+ public StatsdBuilder(IStatsSenderFactory factory)
{
_factory = factory;
}
@@ -33,7 +33,7 @@ public StatsdData BuildStatsData(StatsdConfig config, Action optional
var originDetectionEnabled = IsOriginDetectionEnabled(config);
var serializers = CreateSerializers(config.Prefix, globalTags, config.Advanced.MaxMetricsInAsyncQueue, originDetectionEnabled, config.ContainerID);
var telemetry = CreateTelemetry(serializers.MetricSerializer, config, globalTags, endPoint, transportData.Transport, optionalExceptionHandler, config.SynchronousMode);
- var statsBufferize = CreateStatsBufferize(
+ var statsSender = CreateStatsSender(
telemetry,
transportData.Transport,
transportData.BufferCapacity,
@@ -44,13 +44,13 @@ public StatsdData BuildStatsData(StatsdConfig config, Action optional
optionalExceptionHandler);
var metricsSender = new MetricsSender(
- statsBufferize,
+ statsSender,
new RandomGenerator(),
new StopWatchFactory(),
telemetry,
config.StatsdTruncateIfTooLong,
config.Cardinality);
- return new StatsdData(metricsSender, statsBufferize, transport, telemetry);
+ return new StatsdData(metricsSender, statsSender, transport, telemetry);
}
private static bool IsOriginDetectionEnabled(StatsdConfig config)
@@ -246,7 +246,7 @@ private TransportData CreateTransportData(DogStatsdEndPoint endPoint, StatsdConf
return transportData;
}
- private IStatsBufferize CreateStatsBufferize(
+ private IStatsSender CreateStatsSender(
Telemetry telemetry,
ITransport transport,
int bufferCapacity,
@@ -284,7 +284,7 @@ private IStatsBufferize CreateStatsBufferize(
return new SynchronousSender(statsRouter, optionalExceptionHandler);
}
- return _factory.CreateStatsBufferize(
+ return _factory.CreateAsynchronousBufferizedSender(
statsRouter,
config.MaxMetricsInAsyncQueue,
config.MaxBlockDuration,
diff --git a/src/StatsdClient/StatsdData.cs b/src/StatsdClient/StatsdData.cs
index 28936ed1..5011cea6 100644
--- a/src/StatsdClient/StatsdData.cs
+++ b/src/StatsdClient/StatsdData.cs
@@ -7,17 +7,17 @@ namespace StatsdClient
internal class StatsdData : IDisposable
{
private ITransport _transport;
- private IStatsBufferize _statsBufferize;
+ private IStatsSender _statsSender;
public StatsdData(
MetricsSender metricsSender,
- IStatsBufferize statsBufferize,
+ IStatsSender statsSender,
ITransport transport,
Telemetry telemetry)
{
MetricsSender = metricsSender;
Telemetry = telemetry;
- _statsBufferize = statsBufferize;
+ _statsSender = statsSender;
_transport = transport;
}
@@ -27,7 +27,7 @@ public StatsdData(
public void Flush(bool flushTelemetry)
{
- _statsBufferize?.Flush();
+ _statsSender?.Flush();
if (flushTelemetry)
{
Telemetry.Flush();
@@ -36,14 +36,14 @@ public void Flush(bool flushTelemetry)
public void Dispose()
{
- // _statsBufferize and _telemetry must be disposed before _statsSender to make
+ // _statsSender and _telemetry must be disposed before _statsSender to make
// sure _statsSender does not receive data when it is already disposed.
Telemetry?.Dispose();
Telemetry = null;
- _statsBufferize?.Dispose();
- _statsBufferize = null;
+ _statsSender?.Dispose();
+ _statsSender = null;
_transport?.Dispose();
_transport = null;
diff --git a/tests/StatsdClient.Tests/Bufferize/StatsBufferizeTests.cs b/tests/StatsdClient.Tests/Bufferize/AsynchronousBufferizedSenderTests.cs
similarity index 82%
rename from tests/StatsdClient.Tests/Bufferize/StatsBufferizeTests.cs
rename to tests/StatsdClient.Tests/Bufferize/AsynchronousBufferizedSenderTests.cs
index 964c6843..29c6075a 100644
--- a/tests/StatsdClient.Tests/Bufferize/StatsBufferizeTests.cs
+++ b/tests/StatsdClient.Tests/Bufferize/AsynchronousBufferizedSenderTests.cs
@@ -12,11 +12,11 @@
namespace Tests
{
[TestFixture]
- public class StatsBufferizeTests
+ public class AsynchronousBufferizedSenderTests
{
[Test]
[Timeout(10000)]
- public void StatsBufferize()
+ public void AsynchronousBufferizedSender()
{
var handler = new BufferBuilderHandlerMock();
var bufferBuilder = new BufferBuilder(handler, 30, "\n", Tools.ExceptionHandler);
@@ -25,13 +25,13 @@ public void StatsBufferize()
EventSerializer = new EventSerializer(new SerializerHelper(null, null)),
};
var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
- using (var statsBufferize = new StatsBufferize(statsRouter, 10, null, TimeSpan.Zero, Tools.ExceptionHandler))
+ using (var sender = new AsynchronousBufferizedSender(statsRouter, 10, null, TimeSpan.Zero, Tools.ExceptionHandler))
{
var stats = new Stats { Kind = StatsKind.Event };
stats.Event.Text = "test";
stats.Event.Title = "title";
- statsBufferize.Send(stats);
+ sender.Send(stats);
while (handler.Buffer == null)
{
Task.Delay(TimeSpan.FromMilliseconds(1)).Wait();
diff --git a/tests/StatsdClient.Tests/Bufferize/SynchronousSenderTests.cs b/tests/StatsdClient.Tests/Bufferize/SynchronousSenderTests.cs
index 83214457..8e0e611c 100644
--- a/tests/StatsdClient.Tests/Bufferize/SynchronousSenderTests.cs
+++ b/tests/StatsdClient.Tests/Bufferize/SynchronousSenderTests.cs
@@ -282,6 +282,33 @@ public void ExceptionsForwardedToHandler()
}
}
+ [Test]
+ public void ExceptionsRethrowWhenNoHandler()
+ {
+ var throwingHandler = new ThrowingBufferBuilderHandler();
+ var bufferBuilder = new BufferBuilder(throwingHandler, 20, "\n", null);
+ var serializers = new Serializers
+ {
+ MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty),
+ };
+ var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
+
+ var sender = new SynchronousSender(statsRouter);
+
+ sender.TryDequeueFromPool(out var stats);
+ stats.Kind = StatsKind.Metric;
+ stats.Metric.MetricType = MetricType.Count;
+ stats.Metric.StatName = "will.fail";
+ stats.Metric.NumericValue = 1;
+ stats.Metric.SampleRate = 1.0;
+ stats.Metric.Tags = null;
+
+ // First send buffers the metric; second triggers a flush which throws
+ sender.Send(stats);
+ var ex = Assert.Throws(() => sender.Send(stats));
+ Assert.That(ex.Message, Does.Contain("Transport error"));
+ }
+
///
/// Thread-safe handler that accumulates all buffers for verification.
///
diff --git a/tests/StatsdClient.Tests/StatsdBuilderTests.cs b/tests/StatsdClient.Tests/StatsdBuilderTests.cs
index 64d6e922..09da0ba7 100644
--- a/tests/StatsdClient.Tests/StatsdBuilderTests.cs
+++ b/tests/StatsdClient.Tests/StatsdBuilderTests.cs
@@ -28,7 +28,7 @@ public class StatsdBuilderTests
StatsdConfig.VersionEnvVar,
};
- private Mock _mock;
+ private Mock _mock;
private StatsdBuilder _statsdBuilder;
private UnixEndPoint _unixEndPoint;
private IPEndPoint _ipEndPoint;
@@ -36,7 +36,7 @@ public class StatsdBuilderTests
[SetUp]
public void Init()
{
- _mock = new Mock(MockBehavior.Loose);
+ _mock = new Mock(MockBehavior.Loose);
_statsdBuilder = new StatsdBuilder(_mock.Object);
_mock.Setup(m => m.CreateUnixDomainSocketTransport(
It.IsAny(),
@@ -129,7 +129,7 @@ public void UDSStatsdServerName()
}
[Test]
- public void CreateStatsBufferizeUDP()
+ public void CreateAsynchronousBufferizedSenderUDP()
{
var config = new StatsdConfig { };
var conf = config.Advanced;
@@ -141,7 +141,7 @@ public void CreateStatsBufferizeUDP()
conf.DurationBeforeSendingNotFullBuffer = TimeSpan.FromMilliseconds(4);
BuildStatsData(config);
- _mock.Verify(m => m.CreateStatsBufferize(
+ _mock.Verify(m => m.CreateAsynchronousBufferizedSender(
It.IsAny(),
conf.MaxMetricsInAsyncQueue,
conf.MaxBlockDuration,
@@ -155,7 +155,7 @@ public void CreateStatsBufferizeUDP()
}
[Test]
- public void CreateStatsBufferizeUDS()
+ public void CreateAsynchronousBufferizedSenderUDS()
{
// Skip on Windows
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
@@ -167,7 +167,7 @@ public void CreateStatsBufferizeUDS()
config.StatsdMaxUnixDomainSocketPacketSize = 20;
BuildStatsData(config);
- _mock.Verify(m => m.CreateStatsBufferize(
+ _mock.Verify(m => m.CreateAsynchronousBufferizedSender(
It.IsAny(),
It.IsAny(),
null,
@@ -262,9 +262,9 @@ public void SynchronousModeSkipsAsyncBufferize()
It.IsAny()),
Times.Once);
- // CreateStatsBufferize should NOT be called in synchronous mode
+ // CreateAsynchronousBufferizedSender should NOT be called in synchronous mode
_mock.Verify(
- m => m.CreateStatsBufferize(
+ m => m.CreateAsynchronousBufferizedSender(
It.IsAny(),
It.IsAny(),
It.IsAny(),