diff --git a/src/StatsdClient/Bufferize/StatsBufferize.cs b/src/StatsdClient/Bufferize/AsynchronousBufferizedSender.cs
similarity index 93%
rename from src/StatsdClient/Bufferize/StatsBufferize.cs
rename to src/StatsdClient/Bufferize/AsynchronousBufferizedSender.cs
index 278b89ad..663d9a28 100644
--- a/src/StatsdClient/Bufferize/StatsBufferize.cs
+++ b/src/StatsdClient/Bufferize/AsynchronousBufferizedSender.cs
@@ -5,13 +5,13 @@
namespace StatsdClient.Bufferize
{
///
- /// StatsBufferize bufferizes metrics before sending them.
+ /// AsynchronousBufferizedSender bufferizes metrics before sending them.
///
- internal class StatsBufferize : IDisposable
+ internal class AsynchronousBufferizedSender : IStatsSender
{
private readonly AsynchronousWorker _worker;
- public StatsBufferize(
+ public AsynchronousBufferizedSender(
StatsRouter statsRouter,
int workerMaxItemCount,
TimeSpan? blockingQueueTimeout,
diff --git a/src/StatsdClient/Bufferize/IStatsSender.cs b/src/StatsdClient/Bufferize/IStatsSender.cs
new file mode 100644
index 00000000..8c544ad7
--- /dev/null
+++ b/src/StatsdClient/Bufferize/IStatsSender.cs
@@ -0,0 +1,17 @@
+using System;
+using StatsdClient.Statistic;
+
+namespace StatsdClient.Bufferize
+{
+ ///
+ /// IStatsSender defines the contract for sending stats to the pipeline.
+ ///
+ internal interface IStatsSender : IDisposable
+ {
+ bool TryDequeueFromPool(out Stats stats);
+
+ void Send(Stats stats);
+
+ void Flush();
+ }
+}
diff --git a/src/StatsdClient/Bufferize/IStatsBufferizeFactory.cs b/src/StatsdClient/Bufferize/IStatsSenderFactory.cs
similarity index 76%
rename from src/StatsdClient/Bufferize/IStatsBufferizeFactory.cs
rename to src/StatsdClient/Bufferize/IStatsSenderFactory.cs
index 52667a3d..cb4a2a0a 100644
--- a/src/StatsdClient/Bufferize/IStatsBufferizeFactory.cs
+++ b/src/StatsdClient/Bufferize/IStatsSenderFactory.cs
@@ -7,12 +7,12 @@
namespace StatsdClient.Bufferize
{
///
- /// IStatsBufferizeFactory is a factory for StatsBufferize.
- /// It is used to test StatsBufferize.
+ /// IStatsSenderFactory is a factory for creating IStatsSender instances.
+ /// It is used to test StatsdBuilder.
///
- internal interface IStatsBufferizeFactory
+ internal interface IStatsSenderFactory
{
- StatsBufferize CreateStatsBufferize(
+ IStatsSender CreateAsynchronousBufferizedSender(
StatsRouter statsRouter,
int workerMaxItemCount,
TimeSpan? blockingQueueTimeout,
@@ -38,6 +38,7 @@ Telemetry CreateTelemetry(
TimeSpan flushInterval,
ITransport transport,
string[] globalTags,
- Action optionalExceptionHandler);
+ Action optionalExceptionHandler,
+ bool synchronousMode = false);
}
}
\ No newline at end of file
diff --git a/src/StatsdClient/Bufferize/StatsBufferizeFactory.cs b/src/StatsdClient/Bufferize/StatsSenderFactory.cs
similarity index 85%
rename from src/StatsdClient/Bufferize/StatsBufferizeFactory.cs
rename to src/StatsdClient/Bufferize/StatsSenderFactory.cs
index 4ad91585..3386f712 100644
--- a/src/StatsdClient/Bufferize/StatsBufferizeFactory.cs
+++ b/src/StatsdClient/Bufferize/StatsSenderFactory.cs
@@ -6,16 +6,16 @@
namespace StatsdClient.Bufferize
{
- internal class StatsBufferizeFactory : IStatsBufferizeFactory
+ internal class StatsSenderFactory : IStatsSenderFactory
{
- public StatsBufferize CreateStatsBufferize(
+ public IStatsSender CreateAsynchronousBufferizedSender(
StatsRouter statsRouter,
int workerMaxItemCount,
TimeSpan? blockingQueueTimeout,
TimeSpan maxIdleWaitBeforeSending,
Action optionalExceptionHandler)
{
- return new StatsBufferize(
+ return new AsynchronousBufferizedSender(
statsRouter,
workerMaxItemCount,
blockingQueueTimeout,
@@ -49,9 +49,10 @@ public Telemetry CreateTelemetry(
TimeSpan flushInterval,
ITransport transport,
string[] globalTags,
- Action optionalExceptionHandler)
+ Action optionalExceptionHandler,
+ bool synchronousMode = false)
{
- return new Telemetry(metricSerializer, assemblyVersion, flushInterval, transport, globalTags, optionalExceptionHandler);
+ return new Telemetry(metricSerializer, assemblyVersion, flushInterval, transport, globalTags, optionalExceptionHandler, synchronousMode);
}
public ITransport CreateNamedPipeTransport(string pipeName)
diff --git a/src/StatsdClient/Bufferize/SynchronousSender.cs b/src/StatsdClient/Bufferize/SynchronousSender.cs
new file mode 100644
index 00000000..fcb0b6f2
--- /dev/null
+++ b/src/StatsdClient/Bufferize/SynchronousSender.cs
@@ -0,0 +1,86 @@
+using System;
+using StatsdClient.Statistic;
+
+namespace StatsdClient.Bufferize
+{
+ ///
+ /// SynchronousSender sends metrics synchronously on the calling thread.
+ /// This is intended for serverless environments (e.g., AWS Lambda) where
+ /// background threads may be frozen between invocations.
+ ///
+ internal class SynchronousSender : IStatsSender
+ {
+ [ThreadStatic]
+ private static Stats _threadLocalStats;
+
+ private readonly StatsRouter _statsRouter;
+ private readonly Action _optionalExceptionHandler;
+ private readonly object _lock = new object();
+
+ public SynchronousSender(StatsRouter statsRouter, Action optionalExceptionHandler = null)
+ {
+ _statsRouter = statsRouter ?? throw new ArgumentNullException(nameof(statsRouter));
+ _optionalExceptionHandler = optionalExceptionHandler;
+ }
+
+ public bool TryDequeueFromPool(out Stats stats)
+ {
+ if (_threadLocalStats == null)
+ {
+ _threadLocalStats = new Stats();
+ }
+
+ stats = _threadLocalStats;
+ return true;
+ }
+
+ public void Send(Stats stats)
+ {
+ try
+ {
+ lock (_lock)
+ {
+ _statsRouter.Route(stats);
+ }
+ }
+ catch (Exception e)
+ {
+ if (_optionalExceptionHandler != null)
+ {
+ _optionalExceptionHandler.Invoke(e);
+ }
+ else
+ {
+ throw;
+ }
+ }
+ }
+
+ public void Flush()
+ {
+ try
+ {
+ lock (_lock)
+ {
+ _statsRouter.Flush();
+ }
+ }
+ catch (Exception e)
+ {
+ if (_optionalExceptionHandler != null)
+ {
+ _optionalExceptionHandler.Invoke(e);
+ }
+ else
+ {
+ throw;
+ }
+ }
+ }
+
+ public void Dispose()
+ {
+ Flush();
+ }
+ }
+}
diff --git a/src/StatsdClient/DogStatsdService.cs b/src/StatsdClient/DogStatsdService.cs
index 79c11edb..712e2ffe 100644
--- a/src/StatsdClient/DogStatsdService.cs
+++ b/src/StatsdClient/DogStatsdService.cs
@@ -11,7 +11,7 @@ namespace StatsdClient
///
public class DogStatsdService : IDogStatsd, IDisposable
{
- private StatsdBuilder _statsdBuilder = new StatsdBuilder(new StatsBufferizeFactory());
+ private StatsdBuilder _statsdBuilder = new StatsdBuilder(new StatsSenderFactory());
private MetricsSender _metricsSender;
private StatsdData _statsdData;
private StatsdConfig _config;
diff --git a/src/StatsdClient/MetricsSender.cs b/src/StatsdClient/MetricsSender.cs
index 68c8e195..f7afb775 100644
--- a/src/StatsdClient/MetricsSender.cs
+++ b/src/StatsdClient/MetricsSender.cs
@@ -7,14 +7,14 @@ namespace StatsdClient
internal class MetricsSender
{
private readonly Telemetry _optionalTelemetry;
- private readonly StatsBufferize _statsBufferize;
+ private readonly IStatsSender _statsSender;
private readonly bool _truncateIfTooLong;
private readonly IStopWatchFactory _stopwatchFactory;
private readonly IRandomGenerator _randomGenerator;
private readonly Cardinality? _defaultCardinality;
internal MetricsSender(
- StatsBufferize statsBufferize,
+ IStatsSender statsSender,
IRandomGenerator randomGenerator,
IStopWatchFactory stopwatchFactory,
Telemetry optionalTelemetry,
@@ -22,7 +22,7 @@ internal MetricsSender(
Cardinality? defaultCardinality = null)
{
_stopwatchFactory = stopwatchFactory;
- _statsBufferize = statsBufferize;
+ _statsSender = statsSender;
_randomGenerator = randomGenerator;
_optionalTelemetry = optionalTelemetry;
_truncateIfTooLong = truncateIfTooLong;
@@ -142,7 +142,7 @@ public void Send(Action actionToTime, string statName, double sampleRate = 1.0,
private bool TryDequeueStats(out Stats stats)
{
- if (_statsBufferize.TryDequeueFromPool(out stats))
+ if (_statsSender.TryDequeueFromPool(out stats))
{
return true;
}
@@ -151,6 +151,6 @@ private bool TryDequeueStats(out Stats stats)
return false;
}
- private void Send(Stats metricFields) => _statsBufferize.Send(metricFields);
+ private void Send(Stats metricFields) => _statsSender.Send(metricFields);
}
}
diff --git a/src/StatsdClient/StatsdBuilder.cs b/src/StatsdClient/StatsdBuilder.cs
index f7aa087b..a4d89327 100644
--- a/src/StatsdClient/StatsdBuilder.cs
+++ b/src/StatsdClient/StatsdBuilder.cs
@@ -16,9 +16,9 @@ internal class StatsdBuilder
public static readonly string UnixDomainSocketPrefix = "unix://";
private const string _entityIdInternalTagKey = "dd.internal.entity_id";
- private readonly IStatsBufferizeFactory _factory;
+ private readonly IStatsSenderFactory _factory;
- public StatsdBuilder(IStatsBufferizeFactory factory)
+ public StatsdBuilder(IStatsSenderFactory factory)
{
_factory = factory;
}
@@ -32,24 +32,25 @@ public StatsdData BuildStatsData(StatsdConfig config, Action optional
var originDetectionEnabled = IsOriginDetectionEnabled(config);
var serializers = CreateSerializers(config.Prefix, globalTags, config.Advanced.MaxMetricsInAsyncQueue, originDetectionEnabled, config.ContainerID);
- var telemetry = CreateTelemetry(serializers.MetricSerializer, config, globalTags, endPoint, transportData.Transport, optionalExceptionHandler);
- var statsBufferize = CreateStatsBufferize(
+ var telemetry = CreateTelemetry(serializers.MetricSerializer, config, globalTags, endPoint, transportData.Transport, optionalExceptionHandler, config.SynchronousMode);
+ var statsSender = CreateStatsSender(
telemetry,
transportData.Transport,
transportData.BufferCapacity,
config.Advanced,
serializers,
config.ClientSideAggregation,
+ config.SynchronousMode,
optionalExceptionHandler);
var metricsSender = new MetricsSender(
- statsBufferize,
+ statsSender,
new RandomGenerator(),
new StopWatchFactory(),
telemetry,
config.StatsdTruncateIfTooLong,
config.Cardinality);
- return new StatsdData(metricsSender, statsBufferize, transport, telemetry);
+ return new StatsdData(metricsSender, statsSender, transport, telemetry);
}
private static bool IsOriginDetectionEnabled(StatsdConfig config)
@@ -179,7 +180,8 @@ private Telemetry CreateTelemetry(
string[] globalTags,
DogStatsdEndPoint dogStatsdEndPoint,
ITransport transport,
- Action optionalExceptionHandler)
+ Action optionalExceptionHandler,
+ bool synchronousMode = false)
{
var telemetryFlush = config.Advanced.TelemetryFlushInterval;
@@ -194,7 +196,7 @@ private Telemetry CreateTelemetry(
telemetryTransport = CreateTransport(optionalTelemetryEndPoint, config);
}
- return _factory.CreateTelemetry(metricSerializer, version, telemetryFlush.Value, telemetryTransport, globalTags, optionalExceptionHandler);
+ return _factory.CreateTelemetry(metricSerializer, version, telemetryFlush.Value, telemetryTransport, globalTags, optionalExceptionHandler, synchronousMode);
}
// Telemetry is not enabled
@@ -244,13 +246,14 @@ private TransportData CreateTransportData(DogStatsdEndPoint endPoint, StatsdConf
return transportData;
}
- private StatsBufferize CreateStatsBufferize(
+ private IStatsSender CreateStatsSender(
Telemetry telemetry,
ITransport transport,
int bufferCapacity,
AdvancedStatsConfig config,
Serializers serializers,
ClientSideAggregationConfig optionalClientSideAggregationConfig,
+ bool synchronousMode,
Action optionalExceptionHandler)
{
var bufferHandler = new BufferBuilderHandler(telemetry, transport);
@@ -276,14 +279,17 @@ private StatsBufferize CreateStatsBufferize(
var statsRouter = _factory.CreateStatsRouter(serializers, bufferBuilder, optionalAggregators);
- var statsBufferize = _factory.CreateStatsBufferize(
+ if (synchronousMode)
+ {
+ return new SynchronousSender(statsRouter, optionalExceptionHandler);
+ }
+
+ return _factory.CreateAsynchronousBufferizedSender(
statsRouter,
config.MaxMetricsInAsyncQueue,
config.MaxBlockDuration,
config.DurationBeforeSendingNotFullBuffer,
optionalExceptionHandler);
-
- return statsBufferize;
}
private ITransport CreateUDPTransport(DogStatsdEndPoint endPoint)
diff --git a/src/StatsdClient/StatsdConfig.cs b/src/StatsdClient/StatsdConfig.cs
index 3a8c76a3..3e861fe4 100644
--- a/src/StatsdClient/StatsdConfig.cs
+++ b/src/StatsdClient/StatsdConfig.cs
@@ -160,5 +160,15 @@ public StatsdConfig()
/// This value is used when no cardinality is explicitly specified.
///
public Cardinality? Cardinality { get; set; }
+
+ ///
+ /// Gets or sets a value indicating whether the client operates in synchronous mode.
+ /// When enabled, metrics are sent directly on the calling thread without background
+ /// processing. This is recommended for serverless environments (e.g., AWS Lambda)
+ /// where background threads may be frozen between invocations.
+ /// Call Flush() at the end of each invocation to ensure all buffered metrics are sent.
+ /// Default is false.
+ ///
+ public bool SynchronousMode { get; set; } = false;
}
}
diff --git a/src/StatsdClient/StatsdData.cs b/src/StatsdClient/StatsdData.cs
index f8ba7610..5011cea6 100644
--- a/src/StatsdClient/StatsdData.cs
+++ b/src/StatsdClient/StatsdData.cs
@@ -7,17 +7,17 @@ namespace StatsdClient
internal class StatsdData : IDisposable
{
private ITransport _transport;
- private StatsBufferize _statsBufferize;
+ private IStatsSender _statsSender;
public StatsdData(
MetricsSender metricsSender,
- StatsBufferize statsBufferize,
+ IStatsSender statsSender,
ITransport transport,
Telemetry telemetry)
{
MetricsSender = metricsSender;
Telemetry = telemetry;
- _statsBufferize = statsBufferize;
+ _statsSender = statsSender;
_transport = transport;
}
@@ -27,7 +27,7 @@ public StatsdData(
public void Flush(bool flushTelemetry)
{
- _statsBufferize?.Flush();
+ _statsSender?.Flush();
if (flushTelemetry)
{
Telemetry.Flush();
@@ -36,14 +36,14 @@ public void Flush(bool flushTelemetry)
public void Dispose()
{
- // _statsBufferize and _telemetry must be disposed before _statsSender to make
+ // _statsSender and _telemetry must be disposed before _statsSender to make
// sure _statsSender does not receive data when it is already disposed.
Telemetry?.Dispose();
Telemetry = null;
- _statsBufferize?.Dispose();
- _statsBufferize = null;
+ _statsSender?.Dispose();
+ _statsSender = null;
_transport?.Dispose();
_transport = null;
diff --git a/src/StatsdClient/Telemetry.cs b/src/StatsdClient/Telemetry.cs
index 15e55040..3366b1db 100644
--- a/src/StatsdClient/Telemetry.cs
+++ b/src/StatsdClient/Telemetry.cs
@@ -40,7 +40,8 @@ public Telemetry(
TimeSpan flushInterval,
ITransport transport,
string[] globalTags,
- Action optionalExceptionHandler)
+ Action optionalExceptionHandler,
+ bool synchronousMode = false)
{
_optionalMetricSerializer = metricSerializer;
_optionalTransport = transport;
@@ -53,11 +54,14 @@ public Telemetry(
_aggregatedContexts.Add(MetricType.Count, new ValueWithTags(_optionalTags, "metrics_type:count"));
_aggregatedContexts.Add(MetricType.Set, new ValueWithTags(_optionalTags, "metrics_type:set"));
_optionalExceptionHandler = optionalExceptionHandler;
- _optionalTimer = new Timer(
- _ => Flush(),
- null,
- flushInterval,
- flushInterval);
+ if (!synchronousMode)
+ {
+ _optionalTimer = new Timer(
+ _ => Flush(),
+ null,
+ flushInterval,
+ flushInterval);
+ }
}
public static string MetricsMetricName => _telemetryPrefix + "metrics";
diff --git a/tests/StatsdClient.Tests/Bufferize/StatsBufferizeTests.cs b/tests/StatsdClient.Tests/Bufferize/AsynchronousBufferizedSenderTests.cs
similarity index 82%
rename from tests/StatsdClient.Tests/Bufferize/StatsBufferizeTests.cs
rename to tests/StatsdClient.Tests/Bufferize/AsynchronousBufferizedSenderTests.cs
index 964c6843..29c6075a 100644
--- a/tests/StatsdClient.Tests/Bufferize/StatsBufferizeTests.cs
+++ b/tests/StatsdClient.Tests/Bufferize/AsynchronousBufferizedSenderTests.cs
@@ -12,11 +12,11 @@
namespace Tests
{
[TestFixture]
- public class StatsBufferizeTests
+ public class AsynchronousBufferizedSenderTests
{
[Test]
[Timeout(10000)]
- public void StatsBufferize()
+ public void AsynchronousBufferizedSender()
{
var handler = new BufferBuilderHandlerMock();
var bufferBuilder = new BufferBuilder(handler, 30, "\n", Tools.ExceptionHandler);
@@ -25,13 +25,13 @@ public void StatsBufferize()
EventSerializer = new EventSerializer(new SerializerHelper(null, null)),
};
var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
- using (var statsBufferize = new StatsBufferize(statsRouter, 10, null, TimeSpan.Zero, Tools.ExceptionHandler))
+ using (var sender = new AsynchronousBufferizedSender(statsRouter, 10, null, TimeSpan.Zero, Tools.ExceptionHandler))
{
var stats = new Stats { Kind = StatsKind.Event };
stats.Event.Text = "test";
stats.Event.Title = "title";
- statsBufferize.Send(stats);
+ sender.Send(stats);
while (handler.Buffer == null)
{
Task.Delay(TimeSpan.FromMilliseconds(1)).Wait();
diff --git a/tests/StatsdClient.Tests/Bufferize/SynchronousSenderTests.cs b/tests/StatsdClient.Tests/Bufferize/SynchronousSenderTests.cs
new file mode 100644
index 00000000..8e0e611c
--- /dev/null
+++ b/tests/StatsdClient.Tests/Bufferize/SynchronousSenderTests.cs
@@ -0,0 +1,349 @@
+using System;
+using System.Collections.Concurrent;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using NUnit.Framework;
+using StatsdClient;
+using StatsdClient.Bufferize;
+using StatsdClient.Statistic;
+using Tests.Utils;
+
+namespace Tests
+{
+ [TestFixture]
+ public class SynchronousSenderTests
+ {
+ [Test]
+ public void TryDequeueFromPoolAlwaysSucceeds()
+ {
+ var handler = new BufferBuilderHandlerMock();
+ var bufferBuilder = new BufferBuilder(handler, 1024, "\n", Tools.ExceptionHandler);
+ var serializers = new Serializers
+ {
+ MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty),
+ };
+ var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
+
+ using (var sender = new SynchronousSender(statsRouter))
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ Assert.IsTrue(sender.TryDequeueFromPool(out var stats));
+ Assert.IsNotNull(stats);
+ }
+ }
+ }
+
+ [Test]
+ public void SendAndFlushDeliversMetrics()
+ {
+ var handler = new BufferBuilderHandlerMock();
+ var bufferBuilder = new BufferBuilder(handler, 1024, "\n", Tools.ExceptionHandler);
+ var serializers = new Serializers
+ {
+ MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty),
+ };
+ var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
+
+ using (var sender = new SynchronousSender(statsRouter))
+ {
+ sender.TryDequeueFromPool(out var stats);
+ stats.Kind = StatsKind.Metric;
+ stats.Metric.MetricType = MetricType.Count;
+ stats.Metric.StatName = "test.counter";
+ stats.Metric.NumericValue = 42;
+ stats.Metric.SampleRate = 1.0;
+ stats.Metric.Tags = null;
+
+ sender.Send(stats);
+
+ // Buffer hasn't been flushed yet (not full)
+ Assert.IsNull(handler.Buffer);
+
+ sender.Flush();
+
+ // After flush, data should be delivered
+ Assert.IsNotNull(handler.Buffer);
+ Assert.AreEqual("test.counter:42|c\n", handler.BufferToString());
+ }
+ }
+
+ [Test]
+ public void MultipleMetricsBatchInBuffer()
+ {
+ var handler = new BufferBuilderHandlerMock();
+ var bufferBuilder = new BufferBuilder(handler, 1024, "\n", Tools.ExceptionHandler);
+ var serializers = new Serializers
+ {
+ MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty),
+ };
+ var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
+
+ using (var sender = new SynchronousSender(statsRouter))
+ {
+ sender.TryDequeueFromPool(out var stats);
+
+ stats.Kind = StatsKind.Metric;
+ stats.Metric.MetricType = MetricType.Count;
+ stats.Metric.StatName = "counter1";
+ stats.Metric.NumericValue = 1;
+ stats.Metric.SampleRate = 1.0;
+ stats.Metric.Tags = null;
+ sender.Send(stats);
+
+ stats.Metric.StatName = "counter2";
+ stats.Metric.NumericValue = 2;
+ sender.Send(stats);
+
+ // Nothing sent yet
+ Assert.IsNull(handler.Buffer);
+
+ sender.Flush();
+
+ // Both metrics in a single packet
+ Assert.IsNotNull(handler.Buffer);
+ var result = handler.BufferToString();
+ Assert.That(result, Does.Contain("counter1:1|c"));
+ Assert.That(result, Does.Contain("counter2:2|c"));
+ }
+ }
+
+ [Test]
+ public void BufferAutoFlushesWhenFull()
+ {
+ var handler = new BufferBuilderHandlerMock();
+
+ // Small buffer capacity: first metric fits, second triggers auto-flush of the first
+ var bufferBuilder = new BufferBuilder(handler, 30, "\n", Tools.ExceptionHandler);
+ var serializers = new Serializers
+ {
+ MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty),
+ };
+ var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
+
+ using (var sender = new SynchronousSender(statsRouter))
+ {
+ sender.TryDequeueFromPool(out var stats);
+ stats.Kind = StatsKind.Metric;
+ stats.Metric.MetricType = MetricType.Count;
+ stats.Metric.StatName = "first.metric";
+ stats.Metric.NumericValue = 1;
+ stats.Metric.SampleRate = 1.0;
+ stats.Metric.Tags = null;
+ sender.Send(stats);
+
+ // First metric fits in buffer, nothing flushed yet
+ Assert.IsNull(handler.Buffer);
+
+ // Second metric won't fit alongside the first, triggering auto-flush
+ stats.Metric.StatName = "second.metric";
+ stats.Metric.NumericValue = 2;
+ sender.Send(stats);
+
+ // First metric should have been auto-flushed
+ Assert.IsNotNull(handler.Buffer);
+ Assert.AreEqual("first.metric:1|c\n", handler.BufferToString());
+ }
+ }
+
+ [Test]
+ public void DisposeTriggersFlush()
+ {
+ var handler = new BufferBuilderHandlerMock();
+ var bufferBuilder = new BufferBuilder(handler, 1024, "\n", Tools.ExceptionHandler);
+ var serializers = new Serializers
+ {
+ MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty),
+ };
+ var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
+
+ var sender = new SynchronousSender(statsRouter);
+
+ sender.TryDequeueFromPool(out var stats);
+ stats.Kind = StatsKind.Metric;
+ stats.Metric.MetricType = MetricType.Gauge;
+ stats.Metric.StatName = "test.gauge";
+ stats.Metric.NumericValue = 100;
+ stats.Metric.SampleRate = 1.0;
+ stats.Metric.Tags = null;
+ sender.Send(stats);
+
+ Assert.IsNull(handler.Buffer);
+
+ sender.Dispose();
+
+ Assert.IsNotNull(handler.Buffer);
+ Assert.AreEqual("test.gauge:100|g\n", handler.BufferToString());
+ }
+
+ [Test]
+ public void ThreadSafety()
+ {
+ var handler = new ConcurrentBufferBuilderHandler();
+ var bufferBuilder = new BufferBuilder(handler, 1024, "\n", Tools.ExceptionHandler);
+ var serializers = new Serializers
+ {
+ MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty),
+ };
+ var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
+
+ using (var sender = new SynchronousSender(statsRouter))
+ {
+ const int threadCount = 10;
+ const int metricsPerThread = 100;
+ var barrier = new Barrier(threadCount);
+ var tasks = new Task[threadCount];
+
+ for (int t = 0; t < threadCount; t++)
+ {
+ int threadIndex = t;
+ tasks[t] = Task.Run(() =>
+ {
+ barrier.SignalAndWait();
+ for (int i = 0; i < metricsPerThread; i++)
+ {
+ sender.TryDequeueFromPool(out var stats);
+ stats.Kind = StatsKind.Metric;
+ stats.Metric.MetricType = MetricType.Count;
+ stats.Metric.StatName = $"thread{threadIndex}.counter";
+ stats.Metric.NumericValue = 1;
+ stats.Metric.SampleRate = 1.0;
+ stats.Metric.Tags = null;
+ sender.Send(stats);
+ }
+ });
+ }
+
+ Task.WaitAll(tasks);
+ sender.Flush();
+
+ // Verify no exceptions were thrown and all data was captured
+ var allData = handler.GetAllData();
+ Assert.IsNotEmpty(allData);
+
+ // Verify each thread's metrics appear in the output
+ for (int t = 0; t < threadCount; t++)
+ {
+ Assert.That(allData, Does.Contain($"thread{t}.counter:1|c"));
+ }
+ }
+ }
+
+ [Test]
+ public void FlushWhenEmptyDoesNotThrow()
+ {
+ var handler = new BufferBuilderHandlerMock();
+ var bufferBuilder = new BufferBuilder(handler, 1024, "\n", Tools.ExceptionHandler);
+ var serializers = new Serializers
+ {
+ MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty),
+ };
+ var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
+
+ using (var sender = new SynchronousSender(statsRouter))
+ {
+ Assert.DoesNotThrow(() => sender.Flush());
+ Assert.DoesNotThrow(() => sender.Flush());
+ }
+ }
+
+ [Test]
+ public void ExceptionsForwardedToHandler()
+ {
+ var throwingHandler = new ThrowingBufferBuilderHandler();
+ var bufferBuilder = new BufferBuilder(throwingHandler, 20, "\n", null);
+ var serializers = new Serializers
+ {
+ MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty),
+ };
+ var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
+
+ Exception caughtException = null;
+ Action exceptionHandler = e => caughtException = e;
+
+ using (var sender = new SynchronousSender(statsRouter, exceptionHandler))
+ {
+ sender.TryDequeueFromPool(out var stats);
+ stats.Kind = StatsKind.Metric;
+ stats.Metric.MetricType = MetricType.Count;
+ stats.Metric.StatName = "will.fail";
+ stats.Metric.NumericValue = 1;
+ stats.Metric.SampleRate = 1.0;
+ stats.Metric.Tags = null;
+
+ // Send enough data to trigger a buffer flush (buffer is 20 bytes, metric is ~15)
+ sender.Send(stats);
+ sender.Send(stats);
+
+ // Exception should have been caught by the handler, not propagated
+ Assert.IsNotNull(caughtException);
+ Assert.That(caughtException.Message, Does.Contain("Transport error"));
+ }
+ }
+
+ [Test]
+ public void ExceptionsRethrowWhenNoHandler()
+ {
+ var throwingHandler = new ThrowingBufferBuilderHandler();
+ var bufferBuilder = new BufferBuilder(throwingHandler, 20, "\n", null);
+ var serializers = new Serializers
+ {
+ MetricSerializer = new MetricSerializer(new SerializerHelper(null, null), string.Empty),
+ };
+ var statsRouter = new StatsRouter(serializers, bufferBuilder, null);
+
+ var sender = new SynchronousSender(statsRouter);
+
+ sender.TryDequeueFromPool(out var stats);
+ stats.Kind = StatsKind.Metric;
+ stats.Metric.MetricType = MetricType.Count;
+ stats.Metric.StatName = "will.fail";
+ stats.Metric.NumericValue = 1;
+ stats.Metric.SampleRate = 1.0;
+ stats.Metric.Tags = null;
+
+ // First send buffers the metric; second triggers a flush which throws
+ sender.Send(stats);
+ var ex = Assert.Throws(() => sender.Send(stats));
+ Assert.That(ex.Message, Does.Contain("Transport error"));
+ }
+
+ ///
+ /// Thread-safe handler that accumulates all buffers for verification.
+ ///
+ private class ConcurrentBufferBuilderHandler : IBufferBuilderHandler
+ {
+ private readonly ConcurrentBag _buffers = new ConcurrentBag();
+
+ public void Handle(byte[] buffer, int length)
+ {
+ var data = new byte[length];
+ Array.Copy(buffer, data, length);
+ _buffers.Add(Encoding.UTF8.GetString(data));
+ }
+
+ public string GetAllData()
+ {
+ var sb = new StringBuilder();
+ foreach (var buf in _buffers)
+ {
+ sb.Append(buf);
+ }
+
+ return sb.ToString();
+ }
+ }
+
+ ///
+ /// Handler that throws on Handle to simulate transport failures.
+ ///
+ private class ThrowingBufferBuilderHandler : IBufferBuilderHandler
+ {
+ public void Handle(byte[] buffer, int length)
+ {
+ throw new InvalidOperationException("Transport error");
+ }
+ }
+ }
+}
diff --git a/tests/StatsdClient.Tests/StatsdBuilderTests.cs b/tests/StatsdClient.Tests/StatsdBuilderTests.cs
index a5a00af0..09da0ba7 100644
--- a/tests/StatsdClient.Tests/StatsdBuilderTests.cs
+++ b/tests/StatsdClient.Tests/StatsdBuilderTests.cs
@@ -28,7 +28,7 @@ public class StatsdBuilderTests
StatsdConfig.VersionEnvVar,
};
- private Mock _mock;
+ private Mock _mock;
private StatsdBuilder _statsdBuilder;
private UnixEndPoint _unixEndPoint;
private IPEndPoint _ipEndPoint;
@@ -36,7 +36,7 @@ public class StatsdBuilderTests
[SetUp]
public void Init()
{
- _mock = new Mock(MockBehavior.Loose);
+ _mock = new Mock(MockBehavior.Loose);
_statsdBuilder = new StatsdBuilder(_mock.Object);
_mock.Setup(m => m.CreateUnixDomainSocketTransport(
It.IsAny(),
@@ -56,6 +56,13 @@ public void Init()
});
_ipEndPoint = null;
+ _mock.Setup(m => m.CreateStatsRouter(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .Returns(
+ (s, b, a) => new StatsRouter(s, b, a));
+
foreach (var key in _envVarsKeyToRestore)
{
_envVarsToRestore[key] = Environment.GetEnvironmentVariable(key);
@@ -122,7 +129,7 @@ public void UDSStatsdServerName()
}
[Test]
- public void CreateStatsBufferizeUDP()
+ public void CreateAsynchronousBufferizedSenderUDP()
{
var config = new StatsdConfig { };
var conf = config.Advanced;
@@ -134,7 +141,7 @@ public void CreateStatsBufferizeUDP()
conf.DurationBeforeSendingNotFullBuffer = TimeSpan.FromMilliseconds(4);
BuildStatsData(config);
- _mock.Verify(m => m.CreateStatsBufferize(
+ _mock.Verify(m => m.CreateAsynchronousBufferizedSender(
It.IsAny(),
conf.MaxMetricsInAsyncQueue,
conf.MaxBlockDuration,
@@ -148,7 +155,7 @@ public void CreateStatsBufferizeUDP()
}
[Test]
- public void CreateStatsBufferizeUDS()
+ public void CreateAsynchronousBufferizedSenderUDS()
{
// Skip on Windows
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
@@ -160,7 +167,7 @@ public void CreateStatsBufferizeUDS()
config.StatsdMaxUnixDomainSocketPacketSize = 20;
BuildStatsData(config);
- _mock.Verify(m => m.CreateStatsBufferize(
+ _mock.Verify(m => m.CreateAsynchronousBufferizedSender(
It.IsAny(),
It.IsAny(),
null,
@@ -202,7 +209,8 @@ public void CreateTelemetry()
conf.TelemetryFlushInterval.Value,
It.IsAny(),
It.Is(tags => Enumerable.SequenceEqual(tags, expectedTags)),
- Tools.ExceptionHandler));
+ Tools.ExceptionHandler,
+ false));
}
[Test]
@@ -237,6 +245,56 @@ public void ClientSideAggregation()
It.IsNotNull()));
}
+ [Test]
+ public void SynchronousModeSkipsAsyncBufferize()
+ {
+ var config = new StatsdConfig { };
+ config.SynchronousMode = true;
+ config.Advanced.TelemetryFlushInterval = null;
+
+ BuildStatsData(config);
+
+ // StatsRouter should still be created
+ _mock.Verify(
+ m => m.CreateStatsRouter(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()),
+ Times.Once);
+
+ // CreateAsynchronousBufferizedSender should NOT be called in synchronous mode
+ _mock.Verify(
+ m => m.CreateAsynchronousBufferizedSender(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny>()),
+ Times.Never);
+ }
+
+ [Test]
+ public void SynchronousModeTelemetry()
+ {
+ var config = new StatsdConfig { };
+ config.SynchronousMode = true;
+ config.Advanced.TelemetryFlushInterval = TimeSpan.FromMinutes(1);
+
+ BuildStatsData(config);
+
+ // Telemetry should be created with synchronousMode = true
+ _mock.Verify(
+ m => m.CreateTelemetry(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny>(),
+ true),
+ Times.Once);
+ }
+
[Test]
public void PipeName()
{
diff --git a/tests/StatsdClient.Tests/SynchronousModeIntegrationTests.cs b/tests/StatsdClient.Tests/SynchronousModeIntegrationTests.cs
new file mode 100644
index 00000000..eb9307f3
--- /dev/null
+++ b/tests/StatsdClient.Tests/SynchronousModeIntegrationTests.cs
@@ -0,0 +1,152 @@
+using System;
+using System.Threading;
+using NUnit.Framework;
+using StatsdClient;
+using Tests.Helpers;
+
+namespace Tests
+{
+ [TestFixture]
+ public class SynchronousModeIntegrationTests
+ {
+ private readonly int _serverPort = Convert.ToInt32("8127");
+ private UdpListener _udpListener;
+ private Thread _listenThread;
+ private string _serverName = "127.0.0.1";
+ private DogStatsdService _dogStatsdService;
+
+ [OneTimeSetUp]
+ public void SetupListener()
+ {
+ _udpListener = new UdpListener(_serverName, _serverPort);
+ }
+
+ [OneTimeTearDown]
+ public void TearDownUdpListener()
+ {
+ _udpListener.Dispose();
+ }
+
+ [SetUp]
+ public void Setup()
+ {
+ _listenThread = new Thread(new ParameterizedThreadStart(_udpListener.Listen));
+ _listenThread.Start();
+
+ var config = new StatsdConfig
+ {
+ StatsdServerName = _serverName,
+ StatsdPort = _serverPort,
+ SynchronousMode = true,
+ OriginDetection = false,
+ };
+ config.ClientSideAggregation = null;
+ config.Advanced.TelemetryFlushInterval = null;
+ _dogStatsdService = new DogStatsdService();
+ _dogStatsdService.Configure(config);
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ _udpListener.GetAndClearLastMessages();
+ _dogStatsdService.Dispose();
+ }
+
+ [Test]
+ public void Counter()
+ {
+ _dogStatsdService.Counter("sync.counter", 42);
+ _dogStatsdService.Flush();
+ AssertWasReceived("sync.counter:42|c");
+ }
+
+ [Test]
+ public void Gauge()
+ {
+ _dogStatsdService.Gauge("sync.gauge", 100.5);
+ _dogStatsdService.Flush();
+ AssertWasReceived("sync.gauge:100.5|g");
+ }
+
+ [Test]
+ public void Histogram()
+ {
+ _dogStatsdService.Histogram("sync.histogram", 250);
+ _dogStatsdService.Flush();
+ AssertWasReceived("sync.histogram:250|h");
+ }
+
+ [Test]
+ public void Distribution()
+ {
+ _dogStatsdService.Distribution("sync.distribution", 30);
+ _dogStatsdService.Flush();
+ AssertWasReceived("sync.distribution:30|d");
+ }
+
+ [Test]
+ public void Timer()
+ {
+ _dogStatsdService.Timer("sync.timer", 999);
+ _dogStatsdService.Flush();
+ AssertWasReceived("sync.timer:999|ms");
+ }
+
+ [Test]
+ public void Set()
+ {
+ _dogStatsdService.Set("sync.set", "user123");
+ _dogStatsdService.Flush();
+ AssertWasReceived("sync.set:user123|s");
+ }
+
+ [Test]
+ public void CounterWithTags()
+ {
+ _dogStatsdService.Counter("sync.counter", 1, tags: new[] { "env:prod", "region:us" });
+ _dogStatsdService.Flush();
+ AssertWasReceived("sync.counter:1|c|#env:prod,region:us");
+ }
+
+ [Test]
+ public void Event()
+ {
+ _dogStatsdService.Event("Title", "Text");
+ _dogStatsdService.Flush();
+ AssertWasReceived("_e{5,4}:Title|Text");
+ }
+
+ [Test]
+ public void ServiceCheck()
+ {
+ _dogStatsdService.ServiceCheck("my.check", Status.OK);
+ _dogStatsdService.Flush();
+ AssertWasReceived("_sc|my.check|0");
+ }
+
+ [Test]
+ public void DisposeFlushesRemainingMetrics()
+ {
+ _dogStatsdService.Counter("sync.dispose.counter", 1);
+ _dogStatsdService.Dispose();
+
+ while (_listenThread.IsAlive)
+ {
+ }
+
+ var messages = _udpListener.GetAndClearLastMessages();
+ Assert.IsNotEmpty(messages);
+ Assert.That(messages[0], Does.Contain("sync.dispose.counter:1|c"));
+ }
+
+ private void AssertWasReceived(string shouldBe, int index = 0)
+ {
+ while (_listenThread.IsAlive)
+ {
+ }
+
+ Assert.AreEqual(shouldBe + "\n", _udpListener.GetAndClearLastMessages()[index]);
+ }
+ }
+}