Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
namespace StatsdClient.Bufferize
{
/// <summary>
/// StatsBufferize bufferizes metrics before sending them.
/// AsynchronousBufferizedSender bufferizes metrics before sending them.
/// </summary>
internal class StatsBufferize : IDisposable
internal class AsynchronousBufferizedSender : IStatsSender
{
private readonly AsynchronousWorker<Stats> _worker;

public StatsBufferize(
public AsynchronousBufferizedSender(
StatsRouter statsRouter,
int workerMaxItemCount,
TimeSpan? blockingQueueTimeout,
Expand Down
17 changes: 17 additions & 0 deletions src/StatsdClient/Bufferize/IStatsSender.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;
using StatsdClient.Statistic;

namespace StatsdClient.Bufferize
{
/// <summary>
/// IStatsSender defines the contract for sending stats to the pipeline.
/// </summary>
internal interface IStatsSender : IDisposable
{
bool TryDequeueFromPool(out Stats stats);

void Send(Stats stats);

void Flush();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
namespace StatsdClient.Bufferize
{
/// <summary>
/// IStatsBufferizeFactory is a factory for StatsBufferize.
/// It is used to test StatsBufferize.
/// IStatsSenderFactory is a factory for creating IStatsSender instances.
/// It is used to test StatsdBuilder.
/// </summary>
internal interface IStatsBufferizeFactory
internal interface IStatsSenderFactory
{
StatsBufferize CreateStatsBufferize(
IStatsSender CreateAsynchronousBufferizedSender(
StatsRouter statsRouter,
int workerMaxItemCount,
TimeSpan? blockingQueueTimeout,
Expand All @@ -38,6 +38,7 @@ Telemetry CreateTelemetry(
TimeSpan flushInterval,
ITransport transport,
string[] globalTags,
Action<Exception> optionalExceptionHandler);
Action<Exception> optionalExceptionHandler,
bool synchronousMode = false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@

namespace StatsdClient.Bufferize
{
internal class StatsBufferizeFactory : IStatsBufferizeFactory
internal class StatsSenderFactory : IStatsSenderFactory
{
public StatsBufferize CreateStatsBufferize(
public IStatsSender CreateAsynchronousBufferizedSender(
StatsRouter statsRouter,
int workerMaxItemCount,
TimeSpan? blockingQueueTimeout,
TimeSpan maxIdleWaitBeforeSending,
Action<Exception> optionalExceptionHandler)
{
return new StatsBufferize(
return new AsynchronousBufferizedSender(
statsRouter,
workerMaxItemCount,
blockingQueueTimeout,
Expand Down Expand Up @@ -49,9 +49,10 @@ public Telemetry CreateTelemetry(
TimeSpan flushInterval,
ITransport transport,
string[] globalTags,
Action<Exception> optionalExceptionHandler)
Action<Exception> optionalExceptionHandler,
bool synchronousMode = false)
{
return new Telemetry(metricSerializer, assemblyVersion, flushInterval, transport, globalTags, optionalExceptionHandler);
return new Telemetry(metricSerializer, assemblyVersion, flushInterval, transport, globalTags, optionalExceptionHandler, synchronousMode);
}

public ITransport CreateNamedPipeTransport(string pipeName)
Expand Down
86 changes: 86 additions & 0 deletions src/StatsdClient/Bufferize/SynchronousSender.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
using System;
using StatsdClient.Statistic;

namespace StatsdClient.Bufferize
{
/// <summary>
/// SynchronousSender sends metrics synchronously on the calling thread.
/// This is intended for serverless environments (e.g., AWS Lambda) where
/// background threads may be frozen between invocations.
/// </summary>
internal class SynchronousSender : IStatsSender
{
[ThreadStatic]
private static Stats _threadLocalStats;

private readonly StatsRouter _statsRouter;
private readonly Action<Exception> _optionalExceptionHandler;
private readonly object _lock = new object();

public SynchronousSender(StatsRouter statsRouter, Action<Exception> optionalExceptionHandler = null)
{
_statsRouter = statsRouter ?? throw new ArgumentNullException(nameof(statsRouter));
_optionalExceptionHandler = optionalExceptionHandler;
}

public bool TryDequeueFromPool(out Stats stats)
{
if (_threadLocalStats == null)
{
_threadLocalStats = new Stats();
}

stats = _threadLocalStats;
return true;
}

public void Send(Stats stats)
{
try
{
lock (_lock)
{
_statsRouter.Route(stats);
}
}
catch (Exception e)
{
if (_optionalExceptionHandler != null)
{
_optionalExceptionHandler.Invoke(e);
}
else
{
throw;
}
}
}

public void Flush()
{
try
{
lock (_lock)
{
_statsRouter.Flush();
}
}
catch (Exception e)
{
if (_optionalExceptionHandler != null)
{
_optionalExceptionHandler.Invoke(e);
}
else
{
throw;
}
}
}

public void Dispose()
{
Flush();
}
}
}
2 changes: 1 addition & 1 deletion src/StatsdClient/DogStatsdService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace StatsdClient
/// </summary>
public class DogStatsdService : IDogStatsd, IDisposable
{
private StatsdBuilder _statsdBuilder = new StatsdBuilder(new StatsBufferizeFactory());
private StatsdBuilder _statsdBuilder = new StatsdBuilder(new StatsSenderFactory());
private MetricsSender _metricsSender;
private StatsdData _statsdData;
private StatsdConfig _config;
Expand Down
10 changes: 5 additions & 5 deletions src/StatsdClient/MetricsSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ namespace StatsdClient
internal class MetricsSender
{
private readonly Telemetry _optionalTelemetry;
private readonly StatsBufferize _statsBufferize;
private readonly IStatsSender _statsSender;
private readonly bool _truncateIfTooLong;
private readonly IStopWatchFactory _stopwatchFactory;
private readonly IRandomGenerator _randomGenerator;
private readonly Cardinality? _defaultCardinality;

internal MetricsSender(
StatsBufferize statsBufferize,
IStatsSender statsSender,
IRandomGenerator randomGenerator,
IStopWatchFactory stopwatchFactory,
Telemetry optionalTelemetry,
bool truncateIfTooLong,
Cardinality? defaultCardinality = null)
{
_stopwatchFactory = stopwatchFactory;
_statsBufferize = statsBufferize;
_statsSender = statsSender;
_randomGenerator = randomGenerator;
_optionalTelemetry = optionalTelemetry;
_truncateIfTooLong = truncateIfTooLong;
Expand Down Expand Up @@ -142,7 +142,7 @@ public void Send(Action actionToTime, string statName, double sampleRate = 1.0,

private bool TryDequeueStats(out Stats stats)
{
if (_statsBufferize.TryDequeueFromPool(out stats))
if (_statsSender.TryDequeueFromPool(out stats))
{
return true;
}
Expand All @@ -151,6 +151,6 @@ private bool TryDequeueStats(out Stats stats)
return false;
}

private void Send(Stats metricFields) => _statsBufferize.Send(metricFields);
private void Send(Stats metricFields) => _statsSender.Send(metricFields);
}
}
30 changes: 18 additions & 12 deletions src/StatsdClient/StatsdBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ internal class StatsdBuilder
public static readonly string UnixDomainSocketPrefix = "unix://";
private const string _entityIdInternalTagKey = "dd.internal.entity_id";

private readonly IStatsBufferizeFactory _factory;
private readonly IStatsSenderFactory _factory;

public StatsdBuilder(IStatsBufferizeFactory factory)
public StatsdBuilder(IStatsSenderFactory factory)
{
_factory = factory;
}
Expand All @@ -32,24 +32,25 @@ public StatsdData BuildStatsData(StatsdConfig config, Action<Exception> optional

var originDetectionEnabled = IsOriginDetectionEnabled(config);
var serializers = CreateSerializers(config.Prefix, globalTags, config.Advanced.MaxMetricsInAsyncQueue, originDetectionEnabled, config.ContainerID);
var telemetry = CreateTelemetry(serializers.MetricSerializer, config, globalTags, endPoint, transportData.Transport, optionalExceptionHandler);
var statsBufferize = CreateStatsBufferize(
var telemetry = CreateTelemetry(serializers.MetricSerializer, config, globalTags, endPoint, transportData.Transport, optionalExceptionHandler, config.SynchronousMode);
var statsSender = CreateStatsSender(
telemetry,
transportData.Transport,
transportData.BufferCapacity,
config.Advanced,
serializers,
config.ClientSideAggregation,
config.SynchronousMode,
optionalExceptionHandler);

var metricsSender = new MetricsSender(
statsBufferize,
statsSender,
new RandomGenerator(),
new StopWatchFactory(),
telemetry,
config.StatsdTruncateIfTooLong,
config.Cardinality);
return new StatsdData(metricsSender, statsBufferize, transport, telemetry);
return new StatsdData(metricsSender, statsSender, transport, telemetry);
}

private static bool IsOriginDetectionEnabled(StatsdConfig config)
Expand Down Expand Up @@ -179,7 +180,8 @@ private Telemetry CreateTelemetry(
string[] globalTags,
DogStatsdEndPoint dogStatsdEndPoint,
ITransport transport,
Action<Exception> optionalExceptionHandler)
Action<Exception> optionalExceptionHandler,
bool synchronousMode = false)
{
var telemetryFlush = config.Advanced.TelemetryFlushInterval;

Expand All @@ -194,7 +196,7 @@ private Telemetry CreateTelemetry(
telemetryTransport = CreateTransport(optionalTelemetryEndPoint, config);
}

return _factory.CreateTelemetry(metricSerializer, version, telemetryFlush.Value, telemetryTransport, globalTags, optionalExceptionHandler);
return _factory.CreateTelemetry(metricSerializer, version, telemetryFlush.Value, telemetryTransport, globalTags, optionalExceptionHandler, synchronousMode);
}

// Telemetry is not enabled
Expand Down Expand Up @@ -244,13 +246,14 @@ private TransportData CreateTransportData(DogStatsdEndPoint endPoint, StatsdConf
return transportData;
}

private StatsBufferize CreateStatsBufferize(
private IStatsSender CreateStatsSender(
Telemetry telemetry,
ITransport transport,
int bufferCapacity,
AdvancedStatsConfig config,
Serializers serializers,
ClientSideAggregationConfig optionalClientSideAggregationConfig,
bool synchronousMode,
Action<Exception> optionalExceptionHandler)
{
var bufferHandler = new BufferBuilderHandler(telemetry, transport);
Expand All @@ -276,14 +279,17 @@ private StatsBufferize CreateStatsBufferize(

var statsRouter = _factory.CreateStatsRouter(serializers, bufferBuilder, optionalAggregators);

var statsBufferize = _factory.CreateStatsBufferize(
if (synchronousMode)
{
return new SynchronousSender(statsRouter, optionalExceptionHandler);
}

return _factory.CreateAsynchronousBufferizedSender(
statsRouter,
config.MaxMetricsInAsyncQueue,
config.MaxBlockDuration,
config.DurationBeforeSendingNotFullBuffer,
optionalExceptionHandler);

return statsBufferize;
}

private ITransport CreateUDPTransport(DogStatsdEndPoint endPoint)
Expand Down
10 changes: 10 additions & 0 deletions src/StatsdClient/StatsdConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,5 +160,15 @@ public StatsdConfig()
/// This value is used when no cardinality is explicitly specified.
/// </summary>
public Cardinality? Cardinality { get; set; }

/// <summary>
/// Gets or sets a value indicating whether the client operates in synchronous mode.
/// When enabled, metrics are sent directly on the calling thread without background
/// processing. This is recommended for serverless environments (e.g., AWS Lambda)
/// where background threads may be frozen between invocations.
/// Call Flush() at the end of each invocation to ensure all buffered metrics are sent.
/// Default is false.
/// </summary>
public bool SynchronousMode { get; set; } = false;
}
}
14 changes: 7 additions & 7 deletions src/StatsdClient/StatsdData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ namespace StatsdClient
internal class StatsdData : IDisposable
{
private ITransport _transport;
private StatsBufferize _statsBufferize;
private IStatsSender _statsSender;

public StatsdData(
MetricsSender metricsSender,
StatsBufferize statsBufferize,
IStatsSender statsSender,
ITransport transport,
Telemetry telemetry)
{
MetricsSender = metricsSender;
Telemetry = telemetry;
_statsBufferize = statsBufferize;
_statsSender = statsSender;
_transport = transport;
}

Expand All @@ -27,7 +27,7 @@ public StatsdData(

public void Flush(bool flushTelemetry)
{
_statsBufferize?.Flush();
_statsSender?.Flush();
if (flushTelemetry)
{
Telemetry.Flush();
Expand All @@ -36,14 +36,14 @@ public void Flush(bool flushTelemetry)

public void Dispose()
{
// _statsBufferize and _telemetry must be disposed before _statsSender to make
// _statsSender and _telemetry must be disposed before _statsSender to make
// sure _statsSender does not receive data when it is already disposed.

Telemetry?.Dispose();
Telemetry = null;

_statsBufferize?.Dispose();
_statsBufferize = null;
_statsSender?.Dispose();
_statsSender = null;

_transport?.Dispose();
_transport = null;
Expand Down
Loading
Loading